kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: MINOR: Introduce `producer.config` property to `ConsoleProducer`
Date Wed, 18 Nov 2015 02:10:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 36b1c1dae -> 06d2c7816


MINOR: Introduce `producer.config` property to `ConsoleProducer`

This makes it easier to pass security properties in the same way
to `ConsoleConsumer` and `ConsoleProducer`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #544 from ijuma/producer-config-in-console-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/06d2c781
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/06d2c781
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/06d2c781

Branch: refs/heads/trunk
Commit: 06d2c78164aca4b20d1d76f2ddef563b115f2b81
Parents: 36b1c1d
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Nov 17 18:10:38 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Nov 17 18:10:38 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleProducer.scala     | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/06d2c781/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 979c1bd..bce819e 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -28,6 +28,7 @@ import java.io._
 
 import joptsimple._
 import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.utils.Utils
 
 object ConsoleProducer {
 
@@ -76,10 +77,7 @@ object ConsoleProducer {
   }
 
   def getOldProducerProps(config: ProducerConfig): Properties = {
-
-    val props = new Properties
-
-    props.putAll(config.extraProducerProps)
+    val props = producerProps(config)
 
     props.put("metadata.broker.list", config.brokerList)
     props.put("compression.codec", config.compressionCodec)
@@ -101,11 +99,17 @@ object ConsoleProducer {
     props
   }
 
-  def getNewProducerProps(config: ProducerConfig): Properties = {
-
-    val props = new Properties
-
+  private def producerProps(config: ProducerConfig): Properties = {
+    val props =
+      if (config.options.has(config.producerConfigOpt))
+        Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
+      else new Properties
     props.putAll(config.extraProducerProps)
+    props
+  }
+
+  def getNewProducerProps(config: ProducerConfig): Properties = {
+    val props = producerProps(config)
 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
@@ -237,6 +241,10 @@ object ConsoleProducer {
             .withRequiredArg
             .describedAs("producer_prop")
             .ofType(classOf[String])
+    val producerConfigOpt = parser.accepts("producer.config", s"Producer config properties
file. Note that $producerPropertyOpt takes precedence over this config.")
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
     val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.")
 
     val options = parser.parse(args : _*)


Mime
View raw message