kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1824 ConsoleProducer - properties key.separator and parse.key no longer work; reviewed by Neha Narkhede
Date Sat, 28 Feb 2015 15:41:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 687abc98a -> 22ff9e943


KAFKA-1824 ConsoleProducer - properties key.separator and parse.key no longer work; reviewed
by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22ff9e94
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22ff9e94
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22ff9e94

Branch: refs/heads/trunk
Commit: 22ff9e943a960a73d2f007b2c52b90e675a61299
Parents: 687abc9
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Sat Feb 28 07:41:27 2015 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Sat Feb 28 07:41:37 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleProducer.scala     | 112 ++++++++++++-------
 .../scala/kafka/tools/ConsoleProducerTest.scala |  79 +++++++++++++
 2 files changed, 148 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/22ff9e94/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 7fefc2e..00265f9 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -27,6 +27,7 @@ import java.util.Properties
 import java.io._
 
 import joptsimple._
+import org.apache.kafka.clients.producer.ProducerConfig
 
 object ConsoleProducer {
 
@@ -34,54 +35,14 @@ object ConsoleProducer {
 
     val config = new ProducerConfig(args)
     val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
-    val props = new Properties
-    props.putAll(config.cmdLineProps)
-    props.put("topic", config.topic)
-    reader.init(System.in, props)
+    reader.init(System.in, getReaderProps(config))
 
     try {
         val producer =
           if(config.useNewProducer) {
-            import org.apache.kafka.clients.producer.ProducerConfig
-
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
-            props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
-            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
-            props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
-            props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
-            props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
-            props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
-            props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
-            props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
-            if(config.queueEnqueueTimeoutMs != -1)
-              props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
-            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
-            props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-
-            new NewShinyProducer(props)
+            new NewShinyProducer(getNewProducerProps(config))
           } else {
-            props.put("metadata.broker.list", config.brokerList)
-            props.put("compression.codec", config.compressionCodec)
-            props.put("producer.type", if(config.sync) "sync" else "async")
-            props.put("batch.num.messages", config.batchSize.toString)
-            props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
-            props.put("retry.backoff.ms", config.retryBackoffMs.toString)
-            props.put("queue.buffering.max.ms", config.sendTimeout.toString)
-            props.put("queue.buffering.max.messages", config.queueSize.toString)
-            props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
-            props.put("request.required.acks", config.requestRequiredAcks.toString)
-            props.put("request.timeout.ms", config.requestTimeoutMs.toString)
-            props.put("key.serializer.class", config.keyEncoderClass)
-            props.put("serializer.class", config.valueEncoderClass)
-            props.put("send.buffer.bytes", config.socketBuffer.toString)
-            props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
-            props.put("client.id", "console-producer")
-
-            new OldProducer(props)
+            new OldProducer(getOldProducerProps(config))
           }
 
         Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -104,6 +65,66 @@ object ConsoleProducer {
     System.exit(0)
   }
 
+  def getReaderProps(config: ProducerConfig): Properties = {
+    val props = new Properties
+    props.put("topic",config.topic)
+    props.putAll(config.cmdLineProps)
+    props
+  }
+
+  def getOldProducerProps(config: ProducerConfig): Properties = {
+
+    val props = new Properties;
+
+    props.putAll(config.extraProducerProps)
+
+    props.put("metadata.broker.list", config.brokerList)
+    props.put("compression.codec", config.compressionCodec)
+    props.put("producer.type", if(config.sync) "sync" else "async")
+    props.put("batch.num.messages", config.batchSize.toString)
+    props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
+    props.put("retry.backoff.ms", config.retryBackoffMs.toString)
+    props.put("queue.buffering.max.ms", config.sendTimeout.toString)
+    props.put("queue.buffering.max.messages", config.queueSize.toString)
+    props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
+    props.put("request.required.acks", config.requestRequiredAcks.toString)
+    props.put("request.timeout.ms", config.requestTimeoutMs.toString)
+    props.put("key.serializer.class", config.keyEncoderClass)
+    props.put("serializer.class", config.valueEncoderClass)
+    props.put("send.buffer.bytes", config.socketBuffer.toString)
+    props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
+    props.put("client.id", "console-producer")
+
+    props
+  }
+
+  def getNewProducerProps(config: ProducerConfig): Properties = {
+
+    val props = new Properties;
+
+    props.putAll(config.extraProducerProps)
+
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
+    props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
+    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
+    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
+    props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
+    props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
+    props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
+    props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
+    props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
+    if(config.queueEnqueueTimeoutMs != -1)
+      props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+
+    props
+  }
+
   class ProducerConfig(args: Array[String]) {
     val parser = new OptionParser
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
@@ -211,6 +232,10 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
+    val producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined
properties in the form key=value to the producer. ")
+            .withRequiredArg
+            .describedAs("producer_prop")
+            .ofType(classOf[String])
     val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
 
     val options = parser.parse(args : _*)
@@ -243,6 +268,7 @@ object ConsoleProducer {
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
     val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+    val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
     /* new producer related configs */
     val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
     val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/22ff9e94/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
new file mode 100644
index 0000000..6d1f51c
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.producer
+import kafka.tools.ConsoleProducer.{LineMessageReader, MessageReader,ProducerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
+import joptsimple.UnrecognizedOptionException
+import org.junit.Assert
+import org.junit.Test
+
+
+class ConsoleProducerTest {
+
+  val validArgs: Array[String] = Array(
+    "--broker-list",
+    "localhost:1001,localhost:1002",
+    "--topic",
+    "t3",
+    "--property",
+    "parse.key=true",
+    "--property",
+    "key.separator=#"
+  )
+
+  val invalidArgs: Array[String] = Array(
+    "--t", // not a valid argument
+    "t3"
+  )
+
+  @Test
+  def testValidConfigsNewProducer() {
+    val config = new ConsoleProducer.ProducerConfig(validArgs)
+    // New ProducerConfig constructor is package private, so we can't call it directly
+    // Creating new Producer to validate instead
+    new KafkaProducer[Array[Byte],Array[Byte]](ConsoleProducer.getNewProducerProps(config))
+  }
+
+  @Test
+  def testValidConfigsOldProducer() {
+    val config = new ConsoleProducer.ProducerConfig(validArgs)
+    new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config));
+  }
+
+  @Test
+  def testInvalidConfigs() {
+    try {
+      val config = new ConsoleProducer.ProducerConfig(invalidArgs)
+      Assert.fail("Should have thrown an UnrecognizedOptionException")
+    } catch {
+      case e: joptsimple.OptionException => // expected exception
+    }
+  }
+
+  @Test
+  def testParseKeyProp(): Unit = {
+    val config = new ConsoleProducer.ProducerConfig(validArgs)
+    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader];
+    reader.init(System.in,ConsoleProducer.getReaderProps(config))
+    assert(reader.keySeparator == "#")
+    assert(reader.parseKey == true)
+  }
+
+}


Mime
View raw message