kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: DefaultMessageFormatter custom deserializer fixes
Date Tue, 01 Mar 2016 00:10:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5da935ef7 -> 74eff8a83


MINOR: DefaultMessageFormatter custom deserializer fixes

The ability to specify a deserializer for keys and values was added in a recent commit (845c6eae1f6c6bcf11),
but it contained a few issues.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #987 from ijuma/console-consumer-cleanups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74eff8a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74eff8a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74eff8a8

Branch: refs/heads/trunk
Commit: 74eff8a830ff6508ab98761a9e77d19d4e49a73e
Parents: 5da935e
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Feb 29 16:10:25 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 29 16:10:25 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 39 +++++++++-----------
 1 file changed, 17 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/74eff8a8/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0d85aca..7aee7ab 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -350,12 +350,10 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keySeparator = "\t".getBytes
   var lineSeparator = "\n".getBytes
 
-  var keyDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
-  var valDecoder : Deserializer[_ <: Object] = new ByteArrayDeserializer()
+  var keyDeserializer: Option[Deserializer[_]] = None
+  var valueDeserializer: Option[Deserializer[_]] = None
 
   override def init(props: Properties) {
-    System.out.println(props)
-
     if (props.containsKey("print.timestamp"))
       printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
     if (props.containsKey("print.key"))
@@ -364,22 +362,23 @@ class DefaultMessageFormatter extends MessageFormatter {
       keySeparator = props.getProperty("key.separator").getBytes
     if (props.containsKey("line.separator"))
       lineSeparator = props.getProperty("line.separator").getBytes
+    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
+    if (props.containsKey("key.deserializer"))
+      keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+    // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
+    if (props.containsKey("value.deserializer"))
+      valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+  }
 
-    if (props.containsKey("key.decoder")) {
-      keyDecoder = Class.forName(props.getProperty("key.decoder")).newInstance().asInstanceOf[Deserializer[_
<: Object]]
+  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
 
-      System.out.println("update key decoder")
+    def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], separator:
Array[Byte]) {
+      val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes)
+      val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.getBytes).getOrElse(nonNullBytes)
+      output.write(convertedBytes)
+      output.write(separator)
     }
-    if (props.containsKey("value.decoder")) {
-      valDecoder = Class.forName(props.getProperty("value.decoder")).newInstance().asInstanceOf[Deserializer[_
<: Object]]
 
-      System.out.println("update value decoder")
-    }
-    System.out.println(keyDecoder)
-    System.out.println(valDecoder)
-  }
-
-  def writeTo(key: Array[Byte], value: Array[Byte], timestamp: Long, timestampType: TimestampType,
output: PrintStream) {
     if (printTimestamp) {
       if (timestampType != TimestampType.NO_TIMESTAMP_TYPE)
         output.write(s"$timestampType:$timestamp".getBytes)
@@ -387,12 +386,8 @@ class DefaultMessageFormatter extends MessageFormatter {
         output.write(s"NO_TIMESTAMP".getBytes)
       output.write(keySeparator)
     }
-    if (printKey) {
-      output.write(if (key == null) "null".getBytes else keyDecoder.deserialize(null, key).toString.getBytes)
-      output.write(keySeparator)
-    }
-    output.write(if (value == null) "null".getBytes else valDecoder.deserialize(null, value).toString.getBytes)
-    output.write(lineSeparator)
+    if (printKey) write(keyDeserializer, key, keySeparator)
+    write(valueDeserializer, value, lineSeparator)
   }
 }
 


Mime
View raw message