kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: Forward topic from console consumer to deserializer (#5704)
Date Thu, 29 Nov 2018 02:23:40 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb9f2d8  Forward topic from console consumer to deserializer (#5704)
fb9f2d8 is described below

commit fb9f2d8c9b296c44f42ae3838380895f95171133
Author: Mathieu Chataigner <mathieu.chataigner@gmail.com>
AuthorDate: Thu Nov 29 03:23:30 2018 +0100

    Forward topic from console consumer to deserializer (#5704)
    
    Some deserializer needs the topic name to be able to correctly deserialize the payload
of the message.
    Console consumer works great with Deserializer<String> however it calls deserializer
with topic set as null.
    This breaks the API and the topic information is available in the ConsumerRecord.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>,
Gardner Vickers <gardner@vickers.me>, Jun Rao <junrao@gmail.com>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  8 ++--
 .../scala/kafka/tools/CustomDeserializerTest.scala | 53 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 42c5c5b..9a8c648 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -509,9 +509,9 @@ class DefaultMessageFormatter extends MessageFormatter {
         output.write(lineSeparator)
     }
 
-    def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
+    def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String)
{
       val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
-      val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
+      val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString.
         getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
       output.write(convertedBytes)
     }
@@ -527,12 +527,12 @@ class DefaultMessageFormatter extends MessageFormatter {
     }
 
     if (printKey) {
-      write(keyDeserializer, key)
+      write(keyDeserializer, key, topic)
       writeSeparator(printValue)
     }
 
     if (printValue) {
-      write(valueDeserializer, value)
+      write(valueDeserializer, value, topic)
       output.write(lineSeparator)
     }
   }
diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
new file mode 100644
index 0000000..37b5b79
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
@@ -0,0 +1,53 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.tools
+
+import java.io.PrintStream
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.serialization.Deserializer
+import org.hamcrest.CoreMatchers
+import org.junit.Test
+import org.junit.Assert.assertThat
+import org.scalatest.mockito.MockitoSugar
+
+class CustomDeserializer extends Deserializer[String] {
+  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
+  }
+
+  override def deserialize(topic: String, data: Array[Byte]): String = {
+    assertThat("topic must not be null", topic, CoreMatchers.notNullValue())
+    new String(data)
+  }
+
+  override def close(): Unit = {
+  }
+}
+
+class CustomDeserializerTest extends MockitoSugar {
+
+  @Test
+  def checkDeserializerTopicIsNotNull(): Unit = {
+    val formatter = new DefaultMessageFormatter()
+    formatter.keyDeserializer = Some(new CustomDeserializer)
+
+    formatter.writeTo(new ConsumerRecord("topic_test", 1, 1l, "key".getBytes, "value".getBytes),
mock[PrintStream])
+
+    formatter.close()
+  }
+}


Mime
View raw message