kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1432 Make num.producerThreads configurable on new MirrrorMaker; reviewed by Neha Narkhede, Jun Rao
Date Mon, 05 May 2014 23:56:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe1859549 -> 3f7a9061b


KAFKA-1432 Make num.producerThreads configurable on new MirrrorMaker; reviewed by Neha Narkhede,
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f7a9061
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f7a9061
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f7a9061

Branch: refs/heads/trunk
Commit: 3f7a9061b4d1b9ef9e0f4dae3117fd985f84e072
Parents: fe18595
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Mon May 5 16:56:42 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon May 5 16:56:50 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/producer/BaseProducer.scala     |  69 +++++++
 .../kafka/producer/ByteArrayPartitioner.scala   |   2 +-
 .../scala/kafka/producer/ConsoleProducer.scala  | 123 +++++-------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 152 ++++++++++-----
 .../kafka/tools/newproducer/MirrorMaker.scala   | 187 -------------------
 .../scala/kafka/perf/ProducerPerformance.scala  | 127 +++++--------
 system_test/utils/kafka_system_test_utils.py    |   2 +-
 7 files changed, 270 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/core/src/main/scala/kafka/producer/BaseProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
new file mode 100644
index 0000000..b020793
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/BaseProducer.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import java.util.Properties
+
+// A base producer used whenever we need to have options for both old and new producers;
+// this class will be removed once we fully rolled out 0.9
+trait BaseProducer {
+  def send(topic: String, key: Array[Byte], value: Array[Byte])
+  def close()
+}
+
+class NewShinyProducer(producerProps: Properties) extends BaseProducer {
+  import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+  import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+
+  // decide whether to send synchronously based on producer properties
+  val sync = producerProps.getProperty("producer.type", "async").equals("sync")
+
+  val producer = new KafkaProducer(producerProps)
+
+  override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
+    val record = new ProducerRecord(topic, key, value)
+    if(sync) {
+      this.producer.send(record).get()
+    } else {
+      this.producer.send(record,
+        new ErrorLoggingCallback(topic, key, value, false))
+    }
+  }
+
+  override def close() {
+    this.producer.close()
+  }
+}
+
+class OldProducer(producerProps: Properties) extends BaseProducer {
+  import kafka.producer.{KeyedMessage, ProducerConfig}
+
+  // default to byte array partitioner
+  if (producerProps.getProperty("partitioner.class") == null)
+    producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
+  val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
+
+  override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
+    this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value))
+  }
+
+  override def close() {
+    this.producer.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
index 988e437..6a3b02e 100644
--- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -20,7 +20,7 @@ package kafka.producer
 
 import kafka.utils._
 
-private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner
{
+class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner {
   def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index b19ab49..a2af988 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -17,16 +17,17 @@
 
 package kafka.producer
 
-import joptsimple._
-import java.util.Properties
-import java.io._
 import kafka.common._
 import kafka.message._
 import kafka.serializer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import kafka.utils.{CommandLineUtils, Utils}
+import kafka.utils.CommandLineUtils
+
+import java.util.Properties
+import java.io._
+
+import joptsimple._
 
-object ConsoleProducer { 
+object ConsoleProducer {
 
   def main(args: Array[String]) { 
 
@@ -39,8 +40,46 @@ object ConsoleProducer {
 
     try {
         val producer =
-          if(config.useNewProducer) new NewShinyProducer(config)
-          else new OldProducer(config)
+          if(config.useNewProducer) {
+            import org.apache.kafka.clients.producer.ProducerConfig
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
+            props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
+            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
+            props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
+            props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
+            props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
+            props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
+            props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
+            props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
+            if(config.queueEnqueueTimeoutMs != -1)
+              props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
+            props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+
+            new NewShinyProducer(props)
+          } else {
+            props.put("metadata.broker.list", config.brokerList)
+            props.put("compression.codec", config.compressionCodec)
+            props.put("producer.type", if(config.sync) "sync" else "async")
+            props.put("batch.num.messages", config.batchSize.toString)
+            props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
+            props.put("retry.backoff.ms", config.retryBackoffMs.toString)
+            props.put("queue.buffering.max.ms", config.sendTimeout.toString)
+            props.put("queue.buffering.max.messages", config.queueSize.toString)
+            props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
+            props.put("request.required.acks", config.requestRequiredAcks.toString)
+            props.put("request.timeout.ms", config.requestTimeoutMs.toString)
+            props.put("key.serializer.class", config.keyEncoderClass)
+            props.put("serializer.class", config.valueEncoderClass)
+            props.put("send.buffer.bytes", config.socketBuffer.toString)
+            props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
+            props.put("client.id", "console-producer")
+
+            new OldProducer(props)
+          }
 
         Runtime.getRuntime.addShutdownHook(new Thread() {
           override def run() {
@@ -257,72 +296,4 @@ object ConsoleProducer {
       }
     }
   }
-
-  trait Producer {
-    def send(topic: String, key: Array[Byte], bytes: Array[Byte])
-    def close()
-  }
-
-  class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
-    import org.apache.kafka.clients.producer.ProducerConfig
-    val props = new Properties()
-    props.putAll(producerConfig.cmdLineProps)
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList)
-    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec)
-    props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString)
-    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString)
-    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString)
-    props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString)
-    props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString)
-    props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString)
-    props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString)
-    props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString)
-    if(producerConfig.queueEnqueueTimeoutMs != -1)
-      props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
-    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString)
-    props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString)
-    props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
-    val producer = new KafkaProducer(props)
-
-    def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
-      val response = this.producer.send(new ProducerRecord(topic, key, bytes))
-      if(producerConfig.sync) {
-        response.get()
-      }
-    }
-
-    def close() {
-      this.producer.close()
-    }
-  }
-
-  class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
-    val props = new Properties()
-    props.putAll(producerConfig.cmdLineProps)
-    props.put("metadata.broker.list", producerConfig.brokerList)
-    props.put("compression.codec", producerConfig.compressionCodec)
-    props.put("producer.type", if(producerConfig.sync) "sync" else "async")
-    props.put("batch.num.messages", producerConfig.batchSize.toString)
-    props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString)
-    props.put("retry.backoff.ms", producerConfig.retryBackoffMs.toString)
-    props.put("queue.buffering.max.ms", producerConfig.sendTimeout.toString)
-    props.put("queue.buffering.max.messages", producerConfig.queueSize.toString)
-    props.put("queue.enqueue.timeout.ms", producerConfig.queueEnqueueTimeoutMs.toString)
-    props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
-    props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
-    props.put("key.serializer.class", producerConfig.keyEncoderClass)
-    props.put("serializer.class", producerConfig.valueEncoderClass)
-    props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
-    props.put("topic.metadata.refresh.interval.ms", producerConfig.metadataExpiryMs.toString)
-    props.put("client.id", "console-producer")
-    val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new kafka.producer.ProducerConfig(props))
-
-    def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
-      this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, bytes))
-    }
-
-    def close() {
-      this.producer.close()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index e4d1a86..12fa797 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,21 +17,23 @@
 
 package kafka.tools
 
-import joptsimple.OptionParser
 import kafka.utils.{Utils, CommandLineUtils, Logging}
-import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
-import scala.collection.JavaConversions._
-import java.util.concurrent.CountDownLatch
 import kafka.consumer._
 import kafka.serializer._
-import collection.mutable.ListBuffer
-import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
-import kafka.javaapi
+import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import scala.collection.mutable.ListBuffer
+import scala.collection.JavaConversions._
+
+import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch}
+
+import joptsimple.OptionParser
 
 object MirrorMaker extends Logging {
 
   private var connectors: Seq[ZookeeperConsumerConnector] = null
-  private var consumerThreads: Seq[MirrorMakerThread] = null
+  private var consumerThreads: Seq[ConsumerThread] = null
   private var producerThreads: ListBuffer[ProducerThread] = null
 
   def main(args: Array[String]) {
@@ -52,6 +54,9 @@ object MirrorMaker extends Logging {
       .describedAs("config file")
       .ofType(classOf[String])
 
+    val useNewProducerOpt = parser.accepts("new.producer",
+      "Use the new producer implementation.")
+
     val numProducersOpt = parser.accepts("num.producers",
       "Number of producer instances")
       .withRequiredArg()
@@ -70,7 +75,7 @@ object MirrorMaker extends Logging {
       .withRequiredArg()
       .describedAs("Queue size in terms of number of messages")
       .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10000);
+      .defaultsTo(10000)
 
     val whitelistOpt = parser.accepts("whitelist",
       "Whitelist of topics to mirror.")
@@ -99,24 +104,35 @@ object MirrorMaker extends Logging {
       System.exit(1)
     }
 
-    val numStreams = options.valueOf(numStreamsOpt)
+    val numProducers = options.valueOf(numProducersOpt).intValue()
+    val numStreams = options.valueOf(numStreamsOpt).intValue()
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
-    val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
-      val props = Utils.loadProps(options.valueOf(producerConfigOpt))
-      val config = props.getProperty("partitioner.class") match {
-        case null =>
-          new ProducerConfig(props) {
-            override val partitionerClass = "kafka.producer.ByteArrayPartitioner"
-          }
-        case pClass : String =>
-          new ProducerConfig(props)
-      }
-      new Producer[Array[Byte], Array[Byte]](config)
-    })
+    val useNewProducer = options.has(useNewProducerOpt)
+    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+
+    // create data channel
+    val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize)
+
+    // create producer threads
+    val producers = (1 to numProducers).map(_ => {
+        if (useNewProducer)
+          new NewShinyProducer(producerProps)
+        else
+          new OldProducer(producerProps)
+      })
 
+    producerThreads = new ListBuffer[ProducerThread]()
+    var producerIndex: Int = 1
+    for(producer <- producers) {
+      val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex)
+      producerThreads += producerThread
+      producerIndex += 1
+    }
+
+    // create consumer streams
     connectors = options.valuesOf(consumerConfigOpt).toList
-            .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
+            .map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
             .map(new ZookeeperConsumerConnector(_))
 
     val filterSpec = if (options.has(whitelistOpt))
@@ -126,18 +142,13 @@ object MirrorMaker extends Logging {
 
     var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
     try {
-      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(),
new DefaultDecoder(), new DefaultDecoder())).flatten
+      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new
DefaultDecoder(), new DefaultDecoder())).flatten
     } catch {
       case t: Throwable =>
         fatal("Unable to create stream - shutting down mirror maker.")
         connectors.foreach(_.shutdown)
     }
-
-    val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
-
-    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1,
producerDataChannel, producers, streamAndIndex._2))
-
-    producerThreads = new ListBuffer[ProducerThread]()
+    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1,
mirrorDataChannel, producers, streamAndIndex._2))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -145,15 +156,6 @@ object MirrorMaker extends Logging {
       }
     })
 
-    // create producer threads
-    var i: Int = 1
-    for(producer <- producers) {
-      val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel,
-        new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i)
-      producerThreads += producerThread
-      i += 1
-    }
-
     consumerThreads.foreach(_.start)
     producerThreads.foreach(_.start)
 
@@ -172,14 +174,14 @@ object MirrorMaker extends Logging {
     info("Kafka mirror maker shutdown successfully")
   }
 
-  class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
-                          producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte],
Array[Byte]]],
-                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
+  class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
+                          mirrorDataChannel: BlockingQueue[ProducerRecord],
+                          producers: Seq[BaseProducer],
                           threadId: Int)
           extends Thread with Logging {
 
     private val shutdownLatch = new CountDownLatch(1)
-    private val threadName = "mirrormaker-" + threadId
+    private val threadName = "mirrormaker-consumer-" + threadId
     this.logIdent = "[%s] ".format(threadName)
 
     this.setName(threadName)
@@ -192,14 +194,13 @@ object MirrorMaker extends Logging {
           // Otherwise use a pre-assigned producer to send the message
           if (msgAndMetadata.key == null) {
             trace("Send the non-keyed message the producer channel.")
-            val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
-            producerDataChannel.sendRequest(pd)
+            val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message)
+            mirrorDataChannel.put(data)
           } else {
             val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size()
             trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key),
producerId))
             val producer = producers(producerId)
-            val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key,
msgAndMetadata.message)
-            producer.send(pd)
+            producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
           }
         }
       } catch {
@@ -219,5 +220,62 @@ object MirrorMaker extends Logging {
       }
     }
   }
+
+  class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord],
+                        val producer: BaseProducer,
+                        val threadId: Int) extends Thread {
+    val threadName = "mirrormaker-producer-" + threadId
+    val logger = org.apache.log4j.Logger.getLogger(classOf[KafkaMigrationTool.ProducerThread].getName)
+    val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+
+    private final val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
+
+    setName(threadName)
+
+    override def run {
+      try {
+        while (true) {
+          val data: ProducerRecord = dataChannel.take
+          logger.trace("Sending message with value size %d".format(data.value().size))
+
+          if(data eq shutdownMessage) {
+            logger.info("Producer thread " + threadName + " finished running")
+            return
+          }
+          producer.send(data.topic(), data.key(), data.value())
+        }
+      } catch {
+        case t: Throwable => {
+          logger.fatal("Producer thread failure due to ", t)
+        }
+      } finally {
+        shutdownComplete.countDown
+      }
+    }
+
+    def shutdown {
+      try {
+        logger.info("Producer thread " + threadName + " shutting down")
+        dataChannel.put(shutdownMessage)
+      }
+      catch {
+        case ie: InterruptedException => {
+          logger.warn("Interrupt during shutdown of ProducerThread", ie)
+        }
+      }
+    }
+
+    def awaitShutdown {
+      try {
+        shutdownComplete.await
+        producer.close
+        logger.info("Producer thread " + threadName + " shutdown complete")
+      } catch {
+        case ie: InterruptedException => {
+          logger.warn("Interrupt during shutdown of ProducerThread")
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
deleted file mode 100644
index a969a22..0000000
--- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools.newproducer
-
-import joptsimple.OptionParser
-import kafka.utils.{Utils, CommandLineUtils, Logging}
-import java.util.concurrent.CountDownLatch
-import kafka.consumer._
-import collection.mutable.ListBuffer
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-
-
-object MirrorMaker extends Logging {
-
-  private var connector: ZookeeperConsumerConnector = null
-  private var mirroringThreads: Seq[MirrorMakerThread] = null
-  private var producerChannel: ProducerDataChannel = null
-
-  def main(args: Array[String]) {
-    info ("Starting mirror maker")
-    val parser = new OptionParser
-
-    val consumerConfigOpt = parser.accepts("consumer.config",
-      "Consumer config file to consume from a source cluster.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(classOf[String])
-
-    val producerConfigOpt = parser.accepts("producer.config",
-      "Embedded producer config file for target cluster.")
-      .withRequiredArg()
-      .describedAs("config file")
-      .ofType(classOf[String])
-
-    val numStreamsOpt = parser.accepts("num.streams",
-      "Number of mirroring streams.")
-      .withRequiredArg()
-      .describedAs("Number of threads")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-
-    val whitelistOpt = parser.accepts("whitelist",
-      "Whitelist of topics to mirror.")
-      .withRequiredArg()
-      .describedAs("Java regex (String)")
-      .ofType(classOf[String])
-
-    val blacklistOpt = parser.accepts("blacklist",
-            "Blacklist of topics to mirror.")
-            .withRequiredArg()
-            .describedAs("Java regex (String)")
-            .ofType(classOf[String])
-
-    val helpOpt = parser.accepts("help", "Print this message.")
-    val options = parser.parse(args : _*)
-    if (options.has(helpOpt)) {
-      parser.printHelpOn(System.out)
-      System.exit(0)
-    }
-    CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
-    if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
-      fatal("Exactly one of whitelist or blacklist is required.")
-      System.exit(1)
-    }
-    val filterSpec = if (options.has(whitelistOpt))
-      new Whitelist(options.valueOf(whitelistOpt))
-    else
-      new Blacklist(options.valueOf(blacklistOpt))
-    val producerConfig = options.valueOf(producerConfigOpt)
-    val producerProps = Utils.loadProps(producerConfig)
-    producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
-    val consumerConfig = options.valueOf(consumerConfigOpt)
-    val numStreams = options.valueOf(numStreamsOpt)
-    producerChannel = new ProducerDataChannel()
-    connector = new ZookeeperConsumerConnector(new ConsumerConfig(Utils.loadProps(consumerConfig)))
-    var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
-    try {
-      streams = connector.createMessageStreamsByFilter(filterSpec, numStreams.intValue())
-      debug("%d consumer streams created".format(streams.size))
-    } catch {
-      case t: Throwable =>
-        fatal("Unable to create stream - shutting down mirror maker.")
-        connector.shutdown()
-        System.exit(1)
-    }
-    val streamIndex = new AtomicInteger()
-    streams.foreach(stream => producerChannel.addProducer(new KafkaProducer(producerProps)))
-    mirroringThreads = streams.map(stream => new MirrorMakerThread(stream, streamIndex.getAndIncrement))
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run() {
-        cleanShutdown()
-      }
-    })
-    // start the mirroring threads
-    mirroringThreads.foreach(_.start)
-    // in case the consumer threads hit a timeout/other exception
-    mirroringThreads.foreach(_.awaitShutdown)
-    cleanShutdown()
-  }
-
-  def cleanShutdown() {
-    if (connector != null) connector.shutdown()
-    if (mirroringThreads != null) mirroringThreads.foreach(_.awaitShutdown)
-    if (producerChannel != null) producerChannel.close()
-    info("Kafka mirror maker shutdown successfully")
-  }
-
-  class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
-                          threadId: Int)
-          extends Thread with Logging {
-
-    private val shutdownLatch = new CountDownLatch(1)
-    private val threadName = "mirrormaker-" + threadId
-    this.logIdent = "[%s] ".format(threadName)
-
-    this.setName(threadName)
-
-    override def run() {
-      info("Starting mirror maker thread " + threadName)
-      try {
-        for (msgAndMetadata <- stream) {
-          producerChannel.send(new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key(),
msgAndMetadata.message()))
-        }
-      } catch {
-        case e: Throwable =>
-          fatal("Stream unexpectedly exited.", e)
-      } finally {
-        shutdownLatch.countDown()
-        info("Stopped thread.")
-      }
-    }
-
-    def awaitShutdown() {
-      try {
-        shutdownLatch.await()
-      } catch {
-        case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This
might leak data!".format(threadName))
-      }
-    }
-  }
-
-  class ProducerDataChannel extends Logging {
-    val producers = new ListBuffer[KafkaProducer]
-    var producerIndex = new AtomicInteger(0)
-
-    def addProducer(producer: KafkaProducer) {
-      producers += producer
-    }
-
-    def send(producerRecord: ProducerRecord) {
-      if(producerRecord.key() != null) {
-        val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size
-        trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()),
producerId))
-        val producer = producers(producerId)
-        producer.send(producerRecord,
-                      new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(),
producerRecord.value(), false))
-      } else {
-        val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
-        producers(producerId).send(producerRecord,
-                                   new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(),
producerRecord.value(), false))
-        trace("Sent message to producer " + producerId)
-      }
-    }
-
-    def close() {
-      producers.foreach(_.close())
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 1490bdb..00fa90b 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -17,19 +17,20 @@
 
 package kafka.perf
 
-import java.util.concurrent.{ CountDownLatch, Executors }
-import java.util.concurrent.atomic.AtomicLong
-import kafka.producer._
-import org.apache.log4j.Logger
+import kafka.metrics.KafkaMetricsReporter
+import kafka.producer.{OldProducer, NewShinyProducer}
+import kafka.utils.{VerifiableProperties, Logging}
 import kafka.message.CompressionCodec
-import java.text.SimpleDateFormat
 import kafka.serializer._
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
 import java.util._
-import collection.immutable.List
-import kafka.utils.{VerifiableProperties, Logging, Utils}
-import kafka.metrics.KafkaMetricsReporter
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import java.text.SimpleDateFormat
+import java.math.BigInteger
+import scala.collection.immutable.List
+
+import org.apache.log4j.Logger
 
 /**
  * Load test for the producer
@@ -170,67 +171,6 @@ object ProducerPerformance extends Logging {
     val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
   }
 
-  trait Producer {
-    def send(topic: String, partition: Long, bytes: Array[Byte])
-    def close()
-  }
-
-  class OldRustyProducer(config: ProducerPerfConfig) extends Producer {
-    val props = new Properties()
-    props.put("metadata.broker.list", config.brokerList)
-    props.put("compression.codec", config.compressionCodec.codec.toString)
-    props.put("send.buffer.bytes", (64 * 1024).toString)
-    if (!config.isSync) {
-      props.put("producer.type", "async")
-      props.put("batch.num.messages", config.batchSize.toString)
-      props.put("queue.enqueue.timeout.ms", "-1")
-    }
-    props.put("client.id", "perf-test")
-    props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
-    props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
-    props.put("message.send.max.retries", config.producerNumRetries.toString)
-    props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
-    props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
-    props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
-    val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props))
-
-    def send(topic: String, partition: Long, bytes: Array[Byte]) {
-      this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes))
-    }
-
-    def close() {
-      this.producer.close()
-    }
-  }
-
-  class NewShinyProducer(config: ProducerPerfConfig) extends Producer {
-    import org.apache.kafka.clients.producer.ProducerConfig
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-    props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
-    props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test")
-    props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
-    props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
-    props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
-    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
-    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
-    val producer = new KafkaProducer(props)
-
-    def send(topic: String, partition: Long, bytes: Array[Byte]) {
-      val part = partition % this.producer.partitionsFor(topic).size
-      if (config.isSync) {
-        this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get()
-      } else {
-        this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes),
-                           new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode)
true else false))
-      }
-    }
-
-    def close() {
-      this.producer.close()
-    }
-  }
-
   class ProducerThread(val threadId: Int,
     val config: ProducerPerfConfig,
     val totalBytesSent: AtomicLong,
@@ -241,11 +181,37 @@ object ProducerPerformance extends Logging {
 
     val messagesPerThread = config.numMessages / config.numThreads
     debug("Messages per thread = " + messagesPerThread)
+    val props = new Properties()
     val producer =
-      if (config.useNewProducer)
-        new NewShinyProducer(config)
-      else
-        new OldRustyProducer(config)
+      if (config.useNewProducer) {
+        import org.apache.kafka.clients.producer.ProducerConfig
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
+        props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
+        props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
+        props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
+        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
+        new NewShinyProducer(props)
+      } else {
+        props.put("metadata.broker.list", config.brokerList)
+        props.put("compression.codec", config.compressionCodec.codec.toString)
+        props.put("send.buffer.bytes", (64 * 1024).toString)
+        if (!config.isSync) {
+          props.put("producer.type", "async")
+          props.put("batch.num.messages", config.batchSize.toString)
+          props.put("queue.enqueue.timeout.ms", "-1")
+        }
+        props.put("client.id", "producer-performance")
+        props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
+        props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
+        props.put("message.send.max.retries", config.producerNumRetries.toString)
+        props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
+        props.put("serializer.class", classOf[DefaultEncoder].getName)
+        props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
+        new OldProducer(props)
+      }
 
     // generate the sequential message ID
     private val SEP = ":" // message field separator
@@ -288,15 +254,16 @@ object ProducerPerformance extends Logging {
     override def run {
       var bytesSent = 0L
       var nSends = 0
-      var j: Long = 0L
+      var i: Long = 0L
       var message: Array[Byte] = null
 
-      while (j < messagesPerThread) {
+      while (i < messagesPerThread) {
         try {
           config.topics.foreach(
             topic => {
-              message = generateProducerData(topic, j)
-              producer.send(topic, j, message)
+              message = generateProducerData(topic, i)
+              producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
+              bytesSent += message.size
               nSends += 1
               if (config.messageSendGapMs > 0)
                 Thread.sleep(config.messageSendGapMs)
@@ -304,7 +271,7 @@ object ProducerPerformance extends Logging {
         } catch {
           case e: Throwable => error("Error when sending message " + new String(message),
e)
         }
-        j += 1
+        i += 1
       }
       try {
         producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f7a9061/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 423b512..6917ea1 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -747,7 +747,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
             cmdList = ["ssh " + hostname,
                       "'JAVA_HOME=" + javaHome,
                       "JMX_PORT=" + jmxPort,
-                      kafkaHome + "/bin/kafka-run-class.sh kafka.tools.newproducer.MirrorMaker",
+                      kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker",
                       "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
                       "--producer.config " + configPathName + "/" + mmProducerConfigFile,
                       "--whitelist=\".*\" >> ",


Mime
View raw message