kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5373; Revert breaking change to console consumer
Date Sat, 03 Jun 2017 00:53:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 f2673eb37 -> 27578d009


KAFKA-5373; Revert breaking change to console consumer

This patch reverts b63e41ea78a58bdea78be33f90bfcb61ce5988d3
since it broke the console consumer -- the consumer prints
the addresses of the messages instead of the contents with
that patch.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3218 from apurvam/KAFKA-5373-fix-console-consumer

(cherry picked from commit 8104c0de273fb7627c4172f18a609472186860fd)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27578d00
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27578d00
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27578d00

Branch: refs/heads/0.11.0
Commit: 27578d0096663993612849121fdc5dfb509a117f
Parents: f2673eb
Author: Apurva Mehta <apurva@confluent.io>
Authored: Sat Jun 3 01:53:23 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 3 01:53:56 2017 +0100

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 34 ++++----------------
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 32 ------------------
 2 files changed, 6 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27578d00/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 664557a..a1e2ffa 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -174,8 +174,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    if (!props.containsKey("auto.offset.reset"))
-      props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
+    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
     props.put("zookeeper.connect", config.zkConnectionStr)
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt)
&&
@@ -198,8 +197,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
-      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -329,32 +327,12 @@ object ConsoleConsumer extends Logging {
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
-    if (keyDeserializer != null && !keyDeserializer.isEmpty)
-      // the argument that is provided directly takes precedence
+    if (keyDeserializer != null && !keyDeserializer.isEmpty) {
       formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
-    else if (extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
-      // then the argument that is provided through --consumer-property
-      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
-    else if (consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
-      // then the argument that is provided through --consumer.config
-      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
-    else
-      // the default is used if the argument is not provided directly or indirectly
-      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-
-    if (valueDeserializer != null && !valueDeserializer.isEmpty)
-      // the argument that is provided directly takes precedence
+    }
+    if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
-    else if (extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
-      // then the argument that is provided through --consumer-property
-      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
-    else if (consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
-      // then the argument that is provided through --consumer.config
-      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
-    else
-      // the default is used if the argument is not provided directly or indirectly
-      formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-
+    }
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27578d00/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 3cfb5a5..e0917a2 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -191,36 +191,4 @@ class ConsoleConsumerTest {
 
     assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms"))
   }
-
-  @Test
-  def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() {
-    val propsFile = TestUtils.tempFile()
-    val propsStream = new FileOutputStream(propsFile)
-    propsStream.write("bootstrap.servers=localhost:9093\n".getBytes())
-    propsStream.write("auto.offset.reset=earliest\n".getBytes())
-    propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes())
-    propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes())
-    propsStream.close()
-    val args: Array[String] = Array(
-      "--bootstrap-server", "localhost:9092",
-      "--topic", "test",
-      "--key-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
-      "--value-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer",
-      "--consumer-property", "auto.offset.reset=latest",
-      "--consumer-property", "key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
-      "--consumer-property", "value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
-      "--consumer.config", propsFile.getAbsolutePath
-    )
-
-    val config = new ConsoleConsumer.ConsumerConfig(args)
-    val props = ConsoleConsumer.getNewConsumerProps(config)
-
-    assertEquals("localhost:9092", props.getProperty("bootstrap.servers"))
-    assertEquals("latest", props.getProperty("auto.offset.reset"))
-    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("key.deserializer"))
-    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("value.deserializer"))
-    // serde settings applies to message formatter only, not the consumer itself
-    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("key.deserializer"))
-    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("value.deserializer"))
-  }
 }


Mime
View raw message