kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1337: Fix incorrect producer configs after config renaming.
Date Fri, 18 Apr 2014 20:05:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b9351e04f -> 8b052150f


KAFKA-1337: Fix incorrect producer configs after config renaming.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b052150
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b052150
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b052150

Branch: refs/heads/trunk
Commit: 8b052150f55fb9a6f8c4aedc6b3fa0528719671e
Parents: b9351e0
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Wed Apr 16 17:32:43 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Fri Apr 18 13:05:13 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/producer/ConsoleProducer.scala  | 37 +++++++++++---------
 .../scala/kafka/tools/ReplayLogProducer.scala   |  4 +--
 .../scala/kafka/tools/TestEndToEndLatency.scala |  8 ++---
 .../scala/kafka/tools/TestLogCleaning.scala     |  6 ++--
 4 files changed, 30 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b052150/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 27b0ec8..b19ab49 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -32,7 +32,10 @@ object ConsoleProducer {
 
     val config = new ProducerConfig(args)
     val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
-    reader.init(System.in, config.cmdLineProps)
+    val props = new Properties
+    props.put("topic", config.topic)
+    props.putAll(config.cmdLineProps)
+    reader.init(System.in, props)
 
     try {
         val producer =
@@ -201,7 +204,6 @@ object ConsoleProducer {
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
     val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
-    cmdLineProps.put("topic", topic)
     /* new producer related configs */
     val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
     val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
@@ -262,22 +264,24 @@ object ConsoleProducer {
   }
 
   class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
+    import org.apache.kafka.clients.producer.ProducerConfig
     val props = new Properties()
-    props.put("metadata.broker.list", producerConfig.brokerList)
-    props.put("compression.type", producerConfig.compressionCodec)
-    props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
-    props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
-    props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
-    props.put("metadata.fetch.timeout.ms", producerConfig.metadataFetchTimeoutMs.toString)
-    props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
-    props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
-    props.put("request.retries", producerConfig.messageSendMaxRetries.toString)
-    props.put("linger.ms", producerConfig.sendTimeout.toString)
+    props.putAll(producerConfig.cmdLineProps)
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList)
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec)
+    props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString)
+    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString)
+    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString)
+    props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString)
+    props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString)
+    props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString)
+    props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString)
+    props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString)
     if(producerConfig.queueEnqueueTimeoutMs != -1)
-      props.put("block.on.buffer.full", "false")
-    props.put("total.memory.bytes", producerConfig.maxMemoryBytes.toString)
-    props.put("max.partition.bytes", producerConfig.maxPartitionMemoryBytes.toString)
-    props.put("client.id", "console-producer")
+      props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString)
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString)
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
     val producer = new KafkaProducer(props)
 
     def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
@@ -294,6 +298,7 @@ object ConsoleProducer {
 
   class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
     val props = new Properties()
+    props.putAll(producerConfig.cmdLineProps)
     props.put("metadata.broker.list", producerConfig.brokerList)
     props.put("compression.codec", producerConfig.compressionCodec)
     props.put("producer.type", if(producerConfig.sync) "sync" else "async")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b052150/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index f2246f9..eb71e49 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 import kafka.consumer._
 import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
 
 object ReplayLogProducer extends Logging {
 
@@ -122,7 +122,7 @@ object ReplayLogProducer extends Logging {
     val isSync = options.has(syncOpt)
     import scala.collection.JavaConversions._
     val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
-    producerProps.put("metadata.broker.list", brokerList)
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
   }
 
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends
Thread with Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b052150/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index ea856c7..37a9ec2 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -19,7 +19,7 @@ package kafka.tools
 
 import java.util.Properties
 import kafka.consumer._
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
 
 object TestEndToEndLatency {
   def main(args: Array[String]) {
@@ -46,9 +46,9 @@ object TestEndToEndLatency {
     val iter = stream.iterator
 
     val producerProps = new Properties()
-    producerProps.put("metadata.broker.list", brokerList)
-    producerProps.put("linger.ms", "0")
-    producerProps.put("block.on.buffer.full", "true")
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     val producer = new KafkaProducer(producerProps)
 
     val message = "hello there beautiful".getBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b052150/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
index edb6e5f..595dc7c 100644
--- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -26,7 +26,7 @@ import kafka.serializer._
 import kafka.utils._
 import kafka.log.FileMessageSet
 import kafka.log.Log
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
 
 /**
  * This is a torture test that runs against an existing broker. Here is how it works:
@@ -240,8 +240,8 @@ object TestLogCleaning {
                       dups: Int,
                       percentDeletes: Int): File = {
     val producerProps = new Properties
-    producerProps.setProperty("block.on.buffer.full", "true")
-    producerProps.setProperty("metadata.broker.list", brokerUrl)
+    producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
     val producer = new KafkaProducer(producerProps)
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt


Mime
View raw message