kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2485; Allow producer performance to take properties from a file…
Date Mon, 31 Aug 2015 19:39:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3803e5cb3 -> 835495996


KAFKA-2485; Allow producer performance to take properties from a file…

… via --consumer.config command line parameter

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ewen Cheslack-Postava, Onur Karaman

Closes #174 from lindong28/KAFKA-2485


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83549599
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83549599
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83549599

Branch: refs/heads/trunk
Commit: 8354959969c3fe2094b66c44c1c54c3949605e13
Parents: 3803e5c
Author: Dong Lin <lindong28@gmail.com>
Authored: Mon Aug 31 12:39:01 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Aug 31 12:39:01 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/ProducerPerformance.scala  | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83549599/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 0335cc6..46a68e9 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -29,6 +29,7 @@ import java.util._
 import java.text.SimpleDateFormat
 import java.math.BigInteger
 
+import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
 
 /**
@@ -74,6 +75,10 @@ object ProducerPerformance extends Logging {
       .withRequiredArg
       .describedAs("hostname:port,..,hostname:port")
       .ofType(classOf[String])
+    val producerConfigOpt = parser.accepts("producer.config", "Producer config properties
file.")
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
     val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics
to produce to")
       .withRequiredArg
       .describedAs("topic1,topic2..")
@@ -115,7 +120,7 @@ object ProducerPerformance extends Logging {
       .defaultsTo(0)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the
CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set,
and this parameter is" +
-      "set, the csv metrics will be output here")
+      "set, the csv metrics will be outputted here")
       .withRequiredArg
       .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
@@ -150,6 +155,11 @@ object ProducerPerformance extends Logging {
 
     val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
 
+    val producerProps = if (options.has(producerConfigOpt))
+      Utils.loadProps(options.valueOf(producerConfigOpt))
+    else
+      new Properties()
+
     if (csvMetricsReporterEnabled) {
       val props = new Properties()
       props.put("kafka.metrics.polling.interval.secs", "1")
@@ -180,6 +190,7 @@ object ProducerPerformance extends Logging {
     val producer =
       if (config.useNewProducer) {
         import org.apache.kafka.clients.producer.ProducerConfig
+        props.putAll(config.producerProps)
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
@@ -192,6 +203,7 @@ object ProducerPerformance extends Logging {
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
         new NewShinyProducer(props)
       } else {
+        props.putAll(config.producerProps)
         props.put("metadata.broker.list", config.brokerList)
         props.put("compression.codec", config.compressionCodec.codec.toString)
         props.put("send.buffer.bytes", (64 * 1024).toString)


Mime
View raw message