kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [1/2] KAFKA-1438 Migrate client tools out of perf; reviewed by Neha Narkhede
Date Thu, 05 Jun 2014 05:21:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk df449a24a -> 1363ed7c5


http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
new file mode 100644
index 0000000..95cfbc1
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.metrics.KafkaMetricsReporter
+import kafka.producer.{OldProducer, NewShinyProducer}
+import kafka.utils.{VerifiableProperties, Logging}
+import kafka.message.CompressionCodec
+import kafka.serializer._
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+import java.util._
+import java.text.SimpleDateFormat
+import java.math.BigInteger
+import scala.collection.immutable.List
+
+import org.apache.log4j.Logger
+
+/**
+ * Load test for the producer
+ */
+object ProducerPerformance extends Logging {
+
+  def main(args: Array[String]) {
+
+    val logger = Logger.getLogger(getClass)
+    val config = new ProducerPerfConfig(args)
+    if (!config.isFixedSize)
+      logger.info("WARN: Throughput will be slower due to changing message size per request")
+
+    val totalBytesSent = new AtomicLong(0)
+    val totalMessagesSent = new AtomicLong(0)
+    val executor = Executors.newFixedThreadPool(config.numThreads)
+    val allDone = new CountDownLatch(config.numThreads)
+    val startMs = System.currentTimeMillis
+    val rand = new java.util.Random
+
+    if (!config.hideHeader)
+      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
+        "total.data.sent.in.nMsg, nMsg.sec")
+
+    for (i <- 0 until config.numThreads) {
+      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
+    }
+
+    allDone.await()
+    val endMs = System.currentTimeMillis
+    val elapsedSecs = (endMs - startMs) / 1000.0
+    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
+    println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
+      config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
+      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
+    System.exit(0)
+  }
+
+  class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
+      .withRequiredArg
+      .describedAs("hostname:port,..,hostname:port")
+      .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
+      .withRequiredArg
+      .describedAs("topic1,topic2..")
+      .ofType(classOf[String])
+    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3000)
+    val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
+    val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
+    val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
+      "to complete")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
+    val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
+    val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
+    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+      .withRequiredArg
+      .describedAs("number of threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
+      "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+      "in the form of 'Message:000...1:xxx...'")
+      .withRequiredArg()
+      .describedAs("initial message id")
+      .ofType(classOf[java.lang.Integer])
+    val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
+      .withRequiredArg()
+      .describedAs("message send time gap")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+      "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
+    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
+
+    val options = parser.parse(args: _*)
+    for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
+      if (!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val topicsStr = options.valueOf(topicsOpt)
+    val topics = topicsStr.split(",")
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+    val brokerList = options.valueOf(brokerListOpt)
+    val messageSize = options.valueOf(messageSizeOpt).intValue
+    var isFixedSize = !options.has(varyMessageSizeOpt)
+    var isSync = options.has(syncOpt)
+    var batchSize = options.valueOf(batchSizeOpt).intValue
+    var numThreads = options.valueOf(numThreadsOpt).intValue
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
+    val seqIdMode = options.has(initialMessageIdOpt)
+    var initialMessageId: Int = 0
+    if (seqIdMode)
+      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
+    val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
+    val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
+    val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
+    val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
+    val useNewProducer = options.has(useNewProducerOpt)
+
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+
+    if (csvMetricsReporterEnabled) {
+      val props = new Properties()
+      props.put("kafka.metrics.polling.interval.secs", "1")
+      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        props.put("kafka.csv.metrics.dir", "kafka_metrics")
+      props.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(props)
+      KafkaMetricsReporter.startReporters(verifiableProps)
+    }
+
+    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
+  }
+
+  class ProducerThread(val threadId: Int,
+    val config: ProducerPerfConfig,
+    val totalBytesSent: AtomicLong,
+    val totalMessagesSent: AtomicLong,
+    val allDone: CountDownLatch,
+    val rand: Random) extends Runnable {
+    val seqIdNumDigit = 10 // no. of digits for max int value
+
+    val messagesPerThread = config.numMessages / config.numThreads
+    debug("Messages per thread = " + messagesPerThread)
+    val props = new Properties()
+    val producer =
+      if (config.useNewProducer) {
+        import org.apache.kafka.clients.producer.ProducerConfig
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
+        props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
+        props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
+        props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
+        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
+        new NewShinyProducer(props)
+      } else {
+        props.put("metadata.broker.list", config.brokerList)
+        props.put("compression.codec", config.compressionCodec.codec.toString)
+        props.put("send.buffer.bytes", (64 * 1024).toString)
+        if (!config.isSync) {
+          props.put("producer.type", "async")
+          props.put("batch.num.messages", config.batchSize.toString)
+          props.put("queue.enqueue.timeout.ms", "-1")
+        }
+        props.put("client.id", "producer-performance")
+        props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
+        props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
+        props.put("message.send.max.retries", config.producerNumRetries.toString)
+        props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
+        props.put("serializer.class", classOf[DefaultEncoder].getName)
+        props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
+        new OldProducer(props)
+      }
+
+    // generate the sequential message ID
+    private val SEP = ":" // message field separator
+    private val messageIdLabel = "MessageID"
+    private val threadIdLabel = "ThreadID"
+    private val topicLabel = "Topic"
+    private var leftPaddedSeqId: String = ""
+
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
+      // Each thread gets a unique range of sequential no. for its ids.
+      // Eg. 1000 msg in 10 threads => 100 msg per thread
+      // thread 0 IDs :   0 ~  99
+      // thread 1 IDs : 100 ~ 199
+      // thread 2 IDs : 200 ~ 299
+      // . . .
+      leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
+
+      val msgHeader = topicLabel + SEP +
+        topic + SEP +
+        threadIdLabel + SEP +
+        threadId + SEP +
+        messageIdLabel + SEP +
+        leftPaddedSeqId + SEP
+
+      val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
+      debug(seqMsgString)
+      return seqMsgString.getBytes()
+    }
+
+    private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
+      val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
+      if (config.seqIdMode) {
+        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+        generateMessageWithSeqId(topic, seqId, msgSize)
+      } else {
+        new Array[Byte](msgSize)
+      }
+    }
+
+    override def run {
+      var bytesSent = 0L
+      var nSends = 0
+      var i: Long = 0L
+      var message: Array[Byte] = null
+
+      while (i < messagesPerThread) {
+        try {
+          config.topics.foreach(
+            topic => {
+              message = generateProducerData(topic, i)
+              producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
+              bytesSent += message.size
+              nSends += 1
+              if (config.messageSendGapMs > 0)
+                Thread.sleep(config.messageSendGapMs)
+            })
+        } catch {
+          case e: Throwable => error("Error when sending message " + new String(message), e)
+        }
+        i += 1
+      }
+      try {
+        producer.close()
+      } catch {
+        case e: Throwable => error("Error when closing producer", e)
+      }
+      totalBytesSent.addAndGet(bytesSent)
+      totalMessagesSent.addAndGet(nSends)
+      allDone.countDown()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
new file mode 100644
index 0000000..8b8c472
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.net.URI
+import java.text.SimpleDateFormat
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+import kafka.consumer.SimpleConsumer
+import kafka.utils._
+import org.apache.log4j.Logger
+import kafka.common.TopicAndPartition
+
+
+/**
+ * Performance test for the simple consumer
+ */
+object SimpleConsumerPerformance {
+
+  def main(args: Array[String]) {
+    val logger = Logger.getLogger(getClass)
+    val config = new ConsumerPerfConfig(args)
+
+    if(!config.hideHeader) {
+      if(!config.showDetailedStats)
+        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+      else
+        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+    }
+
+    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
+
+    // reset to latest or smallest offset
+    val topicAndPartition = TopicAndPartition(config.topic, config.partition)
+    val request = OffsetRequest(Map(
+      topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1)
+      ))
+    var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+
+    val startMs = System.currentTimeMillis
+    var done = false
+    var totalBytesRead = 0L
+    var totalMessagesRead = 0L
+    var consumedInterval = 0
+    var lastReportTime: Long = startMs
+    var lastBytesRead = 0L
+    var lastMessagesRead = 0L
+    while(!done) {
+      // TODO: add in the maxWait and minBytes for performance
+      val request = new FetchRequestBuilder()
+        .clientId(config.clientId)
+        .addFetch(config.topic, config.partition, offset, config.fetchSize)
+        .build()
+      val fetchResponse = consumer.fetch(request)
+
+      var messagesRead = 0
+      var bytesRead = 0
+      val messageSet = fetchResponse.messageSet(config.topic, config.partition)
+      for (message <- messageSet) {
+        messagesRead += 1
+        bytesRead += message.message.payloadSize
+      }
+
+      if(messagesRead == 0 || totalMessagesRead > config.numMessages)
+        done = true
+      else
+        // we only did one fetch so we find the offset for the first (head) messageset
+        offset += messageSet.validBytes
+
+      totalBytesRead += bytesRead
+      totalMessagesRead += messagesRead
+      consumedInterval += messagesRead
+
+      if(consumedInterval > config.reportingInterval) {
+        if(config.showDetailedStats) {
+          val reportTime = System.currentTimeMillis
+          val elapsed = (reportTime - lastReportTime)/1000.0
+          val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024)
+          println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize,
+            (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
+            totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
+        }
+        lastReportTime = SystemTime.milliseconds
+        lastBytesRead = totalBytesRead
+        lastMessagesRead = totalMessagesRead
+        consumedInterval = 0
+      }
+    }
+    val reportTime = System.currentTimeMillis
+    val elapsed = (reportTime - startMs) / 1000.0
+
+    if(!config.showDetailedStats) {
+      val totalMBRead = (totalBytesRead*1.0)/(1024*1024)
+      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
+        config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
+        totalMessagesRead, totalMessagesRead/elapsed))
+    }
+    System.exit(0)
+  }
+
+  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
+      "offset to consume from, start with the latest message present in the log rather than the earliest message.")
+    val partitionOpt = parser.accepts("partition", "The topic partition to consume from.")
+                           .withRequiredArg
+                           .describedAs("partition")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(0)
+    val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.")
+                           .withRequiredArg
+                           .describedAs("bytes")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024*1024)
+    val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
+                           .withRequiredArg
+                           .describedAs("clientId")
+                           .ofType(classOf[String])
+                           .defaultsTo("SimpleConsumerPerformanceClient")
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, urlOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val url = new URI(options.valueOf(urlOpt))
+    val fetchSize = options.valueOf(fetchSizeOpt).intValue
+    val fromLatest = options.has(resetBeginningOffsetOpt)
+    val partition = options.valueOf(partitionOpt).intValue
+    val topic = options.valueOf(topicOpt)
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val showDetailedStats = options.has(showDetailedStatsOpt)
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+    val clientId = options.valueOf(clientIdOpt).toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/perf/config/log4j.properties
----------------------------------------------------------------------
diff --git a/perf/config/log4j.properties b/perf/config/log4j.properties
deleted file mode 100644
index 542b739..0000000
--- a/perf/config/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, fileAppender
-
-log4j.appender.fileAppender=org.apache.log4j.FileAppender
-log4j.appender.fileAppender.File=perf.log
-log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-log4j.appender.fileAppender.layout.ConversionPattern=%m %n 
-
-# Turn on all our debugging info
-log4j.logger.kafka=INFO
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
deleted file mode 100644
index 4dde468..0000000
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.perf
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicLong
-import java.nio.channels.ClosedByInterruptException
-import org.apache.log4j.Logger
-import kafka.message.Message
-import kafka.utils.ZkUtils
-import java.util.{ Random, Properties }
-import kafka.consumer._
-import java.text.SimpleDateFormat
-
-/**
- * Performance test for the full zookeeper consumer
- */
-object ConsumerPerformance {
-  private val logger = Logger.getLogger(getClass())
-
-  def main(args: Array[String]): Unit = {
-
-    val config = new ConsumerPerfConfig(args)
-    logger.info("Starting consumer...")
-    var totalMessagesRead = new AtomicLong(0)
-    var totalBytesRead = new AtomicLong(0)
-
-    if (!config.hideHeader) {
-      if (!config.showDetailedStats)
-        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-      else
-        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-    }
-
-    // clean up zookeeper state for this group id for every perf run
-    ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
-
-    val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
-
-    val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
-    var threadList = List[ConsumerPerfThread]()
-    for ((topic, streamList) <- topicMessageStreams)
-      for (i <- 0 until streamList.length)
-        threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
-          totalMessagesRead, totalBytesRead)
-
-    logger.info("Sleeping for 1 second.")
-    Thread.sleep(1000)
-    logger.info("starting threads")
-    val startMs = System.currentTimeMillis
-    for (thread <- threadList)
-      thread.start
-
-    for (thread <- threadList)
-      thread.join
-
-    val endMs = System.currentTimeMillis
-    val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
-    if (!config.showDetailedStats) {
-      val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
-      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-        config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
-        totalMessagesRead.get / elapsedSecs))
-    }
-    System.exit(0)
-  }
-
-  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-      "Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
-      .withRequiredArg
-      .describedAs("gid")
-      .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
-      .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024 * 1024)
-    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
-      "offset to consume from, start with the latest message present in the log rather than the earliest message.")
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(2 * 1024 * 1024)
-    val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10)
-    val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-
-    val options = parser.parse(args: _*)
-
-    for (arg <- List(topicOpt, zkConnectOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val props = new Properties
-    props.put("group.id", options.valueOf(groupIdOpt))
-    props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
-    props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
-    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
-    props.put("consumer.timeout.ms", "5000")
-    props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
-    val consumerConfig = new ConsumerConfig(props)
-    val numThreads = options.valueOf(numThreadsOpt).intValue
-    val topic = options.valueOf(topicOpt)
-    val numMessages = options.valueOf(numMessagesOpt).longValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val showDetailedStats = options.has(showDetailedStatsOpt)
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
-    val hideHeader = options.has(hideHeaderOpt)
-  }
-
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
-    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
-    extends Thread(name) {
-
-    override def run() {
-      var bytesRead = 0L
-      var messagesRead = 0L
-      val startMs = System.currentTimeMillis
-      var lastReportTime: Long = startMs
-      var lastBytesRead = 0L
-      var lastMessagesRead = 0L
-
-      try {
-        val iter = stream.iterator
-        while (iter.hasNext && messagesRead < config.numMessages) {
-          val messageAndMetadata = iter.next
-          messagesRead += 1
-          bytesRead += messageAndMetadata.message.length
-
-          if (messagesRead % config.reportingInterval == 0) {
-            if (config.showDetailedStats)
-              printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
-            lastReportTime = System.currentTimeMillis
-            lastMessagesRead = messagesRead
-            lastBytesRead = bytesRead
-          }
-        }
-      } catch {
-        case _: InterruptedException =>
-        case _: ClosedByInterruptException =>
-        case _: ConsumerTimeoutException =>
-        case e: Throwable => e.printStackTrace()
-      }
-      totalMessagesRead.addAndGet(messagesRead)
-      totalBytesRead.addAndGet(bytesRead)
-      if (config.showDetailedStats)
-        printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
-    }
-
-    private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
-      startMs: Long, endMs: Long) = {
-      val elapsedMs = endMs - startMs
-      val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
-      val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-      println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
-        config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
-        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/perf/src/main/scala/kafka/perf/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala
deleted file mode 100644
index a8fc6b9..0000000
--- a/perf/src/main/scala/kafka/perf/PerfConfig.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.perf
-
-import joptsimple.OptionParser
-
-
-class PerfConfig(args: Array[String]) {
-  val parser = new OptionParser
-  val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume")
-    .withRequiredArg
-    .describedAs("count")
-    .ofType(classOf[java.lang.Long])
-    .defaultsTo(Long.MaxValue)
-  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
-    .withRequiredArg
-    .describedAs("size")
-    .ofType(classOf[java.lang.Integer])
-    .defaultsTo(5000)
-  val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
-    "See java.text.SimpleDateFormat for options.")
-    .withRequiredArg
-    .describedAs("date format")
-    .ofType(classOf[String])
-    .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
-  val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
-    "interval as configured by reporting-interval")
-  val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ")
-  val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-    .withRequiredArg
-    .describedAs("size")
-    .ofType(classOf[java.lang.Integer])
-    .defaultsTo(100)
-  val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.")
-    .withRequiredArg
-    .describedAs("size")
-    .ofType(classOf[java.lang.Integer])
-    .defaultsTo(200)
-  val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
-    .withRequiredArg
-    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
-    .ofType(classOf[java.lang.Integer])
-    .defaultsTo(0)
-  val helpOpt = parser.accepts("help", "Print usage.")
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
deleted file mode 100644
index 00fa90b..0000000
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.perf
-
-import kafka.metrics.KafkaMetricsReporter
-import kafka.producer.{OldProducer, NewShinyProducer}
-import kafka.utils.{VerifiableProperties, Logging}
-import kafka.message.CompressionCodec
-import kafka.serializer._
-
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-import java.util._
-import java.text.SimpleDateFormat
-import java.math.BigInteger
-import scala.collection.immutable.List
-
-import org.apache.log4j.Logger
-
-/**
- * Load test for the producer
- */
-object ProducerPerformance extends Logging {
-
-  def main(args: Array[String]) {
-
-    val logger = Logger.getLogger(getClass)
-    val config = new ProducerPerfConfig(args)
-    if (!config.isFixedSize)
-      logger.info("WARN: Throughput will be slower due to changing message size per request")
-
-    val totalBytesSent = new AtomicLong(0)
-    val totalMessagesSent = new AtomicLong(0)
-    val executor = Executors.newFixedThreadPool(config.numThreads)
-    val allDone = new CountDownLatch(config.numThreads)
-    val startMs = System.currentTimeMillis
-    val rand = new java.util.Random
-
-    if (!config.hideHeader)
-      println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
-        "total.data.sent.in.nMsg, nMsg.sec")
-
-    for (i <- 0 until config.numThreads) {
-      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
-    }
-
-    allDone.await()
-    val endMs = System.currentTimeMillis
-    val elapsedSecs = (endMs - startMs) / 1000.0
-    val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
-    println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
-      config.dateFormat.format(startMs), config.dateFormat.format(endMs),
-      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
-      totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
-    System.exit(0)
-  }
-
-  class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
-      .withRequiredArg
-      .describedAs("hostname:port,..,hostname:port")
-      .ofType(classOf[String])
-    val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
-      .withRequiredArg
-      .describedAs("topic1,topic2..")
-      .ofType(classOf[String])
-    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3000)
-    val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
-    val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
-      "to complete")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
-    val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
-    val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
-    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("number of threads")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
-      "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
-      "in the form of 'Message:000...1:xxx...'")
-      .withRequiredArg()
-      .describedAs("initial message id")
-      .ofType(classOf[java.lang.Integer])
-    val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
-      .withRequiredArg()
-      .describedAs("message send time gap")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
-    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-      "set, the csv metrics will be outputed here")
-      .withRequiredArg
-      .describedAs("metrics dictory")
-      .ofType(classOf[java.lang.String])
-    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
-
-    val options = parser.parse(args: _*)
-    for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
-      if (!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-    val topicsStr = options.valueOf(topicsOpt)
-    val topics = topicsStr.split(",")
-    val numMessages = options.valueOf(numMessagesOpt).longValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
-    val hideHeader = options.has(hideHeaderOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    val messageSize = options.valueOf(messageSizeOpt).intValue
-    var isFixedSize = !options.has(varyMessageSizeOpt)
-    var isSync = options.has(syncOpt)
-    var batchSize = options.valueOf(batchSizeOpt).intValue
-    var numThreads = options.valueOf(numThreadsOpt).intValue
-    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
-    val seqIdMode = options.has(initialMessageIdOpt)
-    var initialMessageId: Int = 0
-    if (seqIdMode)
-      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
-    val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
-    val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
-    val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
-    val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
-    val useNewProducer = options.has(useNewProducerOpt)
-
-    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
-
-    if (csvMetricsReporterEnabled) {
-      val props = new Properties()
-      props.put("kafka.metrics.polling.interval.secs", "1")
-      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
-      if (options.has(metricsDirectoryOpt))
-        props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
-      else
-        props.put("kafka.csv.metrics.dir", "kafka_metrics")
-      props.put("kafka.csv.metrics.reporter.enabled", "true")
-      val verifiableProps = new VerifiableProperties(props)
-      KafkaMetricsReporter.startReporters(verifiableProps)
-    }
-
-    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
-  }
-
-  class ProducerThread(val threadId: Int,
-    val config: ProducerPerfConfig,
-    val totalBytesSent: AtomicLong,
-    val totalMessagesSent: AtomicLong,
-    val allDone: CountDownLatch,
-    val rand: Random) extends Runnable {
-    val seqIdNumDigit = 10 // no. of digits for max int value
-
-    val messagesPerThread = config.numMessages / config.numThreads
-    debug("Messages per thread = " + messagesPerThread)
-    val props = new Properties()
-    val producer =
-      if (config.useNewProducer) {
-        import org.apache.kafka.clients.producer.ProducerConfig
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-        props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
-        props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
-        props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
-        props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
-        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
-        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
-        new NewShinyProducer(props)
-      } else {
-        props.put("metadata.broker.list", config.brokerList)
-        props.put("compression.codec", config.compressionCodec.codec.toString)
-        props.put("send.buffer.bytes", (64 * 1024).toString)
-        if (!config.isSync) {
-          props.put("producer.type", "async")
-          props.put("batch.num.messages", config.batchSize.toString)
-          props.put("queue.enqueue.timeout.ms", "-1")
-        }
-        props.put("client.id", "producer-performance")
-        props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
-        props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
-        props.put("message.send.max.retries", config.producerNumRetries.toString)
-        props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
-        props.put("serializer.class", classOf[DefaultEncoder].getName)
-        props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
-        new OldProducer(props)
-      }
-
-    // generate the sequential message ID
-    private val SEP = ":" // message field separator
-    private val messageIdLabel = "MessageID"
-    private val threadIdLabel = "ThreadID"
-    private val topicLabel = "Topic"
-    private var leftPaddedSeqId: String = ""
-
-    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
-      // Each thread gets a unique range of sequential no. for its ids.
-      // Eg. 1000 msg in 10 threads => 100 msg per thread
-      // thread 0 IDs :   0 ~  99
-      // thread 1 IDs : 100 ~ 199
-      // thread 2 IDs : 200 ~ 299
-      // . . .
-      leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
-
-      val msgHeader = topicLabel + SEP +
-        topic + SEP +
-        threadIdLabel + SEP +
-        threadId + SEP +
-        messageIdLabel + SEP +
-        leftPaddedSeqId + SEP
-
-      val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
-      debug(seqMsgString)
-      return seqMsgString.getBytes()
-    }
-
-    private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
-      val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
-      if (config.seqIdMode) {
-        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
-        generateMessageWithSeqId(topic, seqId, msgSize)
-      } else {
-        new Array[Byte](msgSize)
-      }
-    }
-
-    override def run {
-      var bytesSent = 0L
-      var nSends = 0
-      var i: Long = 0L
-      var message: Array[Byte] = null
-
-      while (i < messagesPerThread) {
-        try {
-          config.topics.foreach(
-            topic => {
-              message = generateProducerData(topic, i)
-              producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
-              bytesSent += message.size
-              nSends += 1
-              if (config.messageSendGapMs > 0)
-                Thread.sleep(config.messageSendGapMs)
-            })
-        } catch {
-          case e: Throwable => error("Error when sending message " + new String(message), e)
-        }
-        i += 1
-      }
-      try {
-        producer.close()
-      } catch {
-        case e: Throwable => error("Error when closing producer", e)
-      }
-      totalBytesSent.addAndGet(bytesSent)
-      totalMessagesSent.addAndGet(nSends)
-      allDone.countDown()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
deleted file mode 100644
index c52ada0..0000000
--- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.perf
-
-import java.net.URI
-import java.text.SimpleDateFormat
-import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.consumer.SimpleConsumer
-import kafka.utils._
-import org.apache.log4j.Logger
-import kafka.common.TopicAndPartition
-
-
-/**
- * Performance test for the simple consumer
- */
-object SimpleConsumerPerformance {
-
-  def main(args: Array[String]) {
-    val logger = Logger.getLogger(getClass)
-    val config = new ConsumerPerfConfig(args)
-
-    if(!config.hideHeader) {
-      if(!config.showDetailedStats)
-        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-      else
-        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
-    }
-
-    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId)
-
-    // reset to latest or smallest offset
-    val topicAndPartition = TopicAndPartition(config.topic, config.partition)
-    val request = OffsetRequest(Map(
-      topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1)
-      ))
-    var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-
-    val startMs = System.currentTimeMillis
-    var done = false
-    var totalBytesRead = 0L
-    var totalMessagesRead = 0L
-    var consumedInterval = 0
-    var lastReportTime: Long = startMs
-    var lastBytesRead = 0L
-    var lastMessagesRead = 0L
-    while(!done) {
-      // TODO: add in the maxWait and minBytes for performance
-      val request = new FetchRequestBuilder()
-        .clientId(config.clientId)
-        .addFetch(config.topic, config.partition, offset, config.fetchSize)
-        .build()
-      val fetchResponse = consumer.fetch(request)
-
-      var messagesRead = 0
-      var bytesRead = 0
-      val messageSet = fetchResponse.messageSet(config.topic, config.partition)
-      for (message <- messageSet) {
-        messagesRead += 1
-        bytesRead += message.message.payloadSize
-      }
-      
-      if(messagesRead == 0 || totalMessagesRead > config.numMessages)
-        done = true
-      else
-        // we only did one fetch so we find the offset for the first (head) messageset
-        offset += messageSet.validBytes
-      
-      totalBytesRead += bytesRead
-      totalMessagesRead += messagesRead
-      consumedInterval += messagesRead
-      
-      if(consumedInterval > config.reportingInterval) {
-        if(config.showDetailedStats) {
-          val reportTime = System.currentTimeMillis
-          val elapsed = (reportTime - lastReportTime)/1000.0
-          val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024)
-          println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize,
-            (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
-            totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
-        }
-        lastReportTime = SystemTime.milliseconds
-        lastBytesRead = totalBytesRead
-        lastMessagesRead = totalMessagesRead
-        consumedInterval = 0
-      }
-    }
-    val reportTime = System.currentTimeMillis
-    val elapsed = (reportTime - startMs) / 1000.0
-
-    if(!config.showDetailedStats) {
-      val totalMBRead = (totalBytesRead*1.0)/(1024*1024)
-      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
-        config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
-        totalMessagesRead, totalMessagesRead/elapsed))
-    }
-    System.exit(0)
-  }
-
-  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
-                           .withRequiredArg
-                           .describedAs("kafka://hostname:port")
-                           .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
-      "offset to consume from, start with the latest message present in the log rather than the earliest message.")
-    val partitionOpt = parser.accepts("partition", "The topic partition to consume from.")
-                           .withRequiredArg
-                           .describedAs("partition")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(0)
-    val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.")
-                           .withRequiredArg
-                           .describedAs("bytes")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024*1024)
-    val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
-                           .withRequiredArg
-                           .describedAs("clientId")
-                           .ofType(classOf[String])
-                           .defaultsTo("SimpleConsumerPerformanceClient")
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, urlOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-    val url = new URI(options.valueOf(urlOpt))
-    val fetchSize = options.valueOf(fetchSizeOpt).intValue
-    val fromLatest = options.has(resetBeginningOffsetOpt)
-    val partition = options.valueOf(partitionOpt).intValue
-    val topic = options.valueOf(topicOpt)
-    val numMessages = options.valueOf(numMessagesOpt).longValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val showDetailedStats = options.has(showDetailedStatsOpt)
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
-    val hideHeader = options.has(hideHeaderOpt)
-    val clientId = options.valueOf(clientIdOpt).toString
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/system_test/broker_failure/bin/run-test.sh
----------------------------------------------------------------------
diff --git a/system_test/broker_failure/bin/run-test.sh b/system_test/broker_failure/bin/run-test.sh
index 1f11180..549cd1f 100755
--- a/system_test/broker_failure/bin/run-test.sh
+++ b/system_test/broker_failure/bin/run-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,7 @@
 # ===========
 # run-test.sh
 # ===========
- 
+
 # ====================================
 # Do not change the followings
 # (keep this section at the beginning
@@ -52,9 +52,9 @@ readonly source_console_consumer_grp=source
 readonly target_console_consumer_grp=target
 readonly message_size=100
 readonly console_consumer_timeout_ms=15000
-readonly num_kafka_source_server=4                   # requires same no. of property files such as: 
+readonly num_kafka_source_server=4                   # requires same no. of property files such as:
                                                      # $base_dir/config/server_source{1..4}.properties
-readonly num_kafka_target_server=3                   # requires same no. of property files such as: 
+readonly num_kafka_target_server=3                   # requires same no. of property files such as:
                                                      # $base_dir/config/server_target{1..3}.properties
 readonly num_kafka_mirror_maker=3                    # any values greater than 0
 readonly wait_time_after_killing_broker=0            # wait after broker is stopped but before starting again
@@ -65,8 +65,8 @@ readonly wait_time_after_restarting_broker=10
 # ====================================
 num_msg_per_batch=500                                # no. of msg produced in each calling of ProducerPerformance
 num_producer_threads=5                               # no. of producer threads to send msg
-producer_sleep_min=5                                 # min & max sleep time (in sec) between each 
-producer_sleep_max=5                                 # batch of messages sent from producer 
+producer_sleep_min=5                                 # min & max sleep time (in sec) between each
+producer_sleep_max=5                                 # batch of messages sent from producer
 
 # ====================================
 # zookeeper
@@ -255,7 +255,7 @@ create_topic() {
         --topic $this_topic_to_create \
         --zookeeper $this_zk_conn_str \
         --replica $this_replica_factor \
-        2> $kafka_topic_creation_log_file 
+        2> $kafka_topic_creation_log_file
 }
 
 # =========================================
@@ -281,7 +281,7 @@ start_zk() {
 start_source_servers_cluster() {
     info "starting source cluster"
 
-    for ((i=1; i<=$num_kafka_source_server; i++)) 
+    for ((i=1; i<=$num_kafka_source_server; i++))
     do
         start_source_server $i
     done
@@ -367,13 +367,13 @@ start_console_consumer() {
 
     info "starting console consumers for $this_consumer_grp"
 
-    $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
+    $base_dir/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer \
         --zookeeper localhost:$this_consumer_zk_port \
         --topic $test_topic \
         --group $this_consumer_grp \
         --from-beginning \
         --consumer-timeout-ms $console_consumer_timeout_ms \
-        --formatter "kafka.consumer.ConsoleConsumer\$${this_msg_formatter}" \
+        --formatter "kafka.tools.ConsoleConsumer\$${this_msg_formatter}" \
         2>&1 > ${this_consumer_log} &
     console_consumer_pid=$!
 
@@ -448,7 +448,7 @@ start_background_producer() {
 
         info "producing $num_msg_per_batch messages on topic '$topic'"
         $base_dir/bin/kafka-run-class.sh \
-            kafka.perf.ProducerPerformance \
+            kafka.tools.ProducerPerformance \
             --brokerinfo zk.connect=localhost:2181 \
             --topics $topic \
             --messages $num_msg_per_batch \
@@ -499,7 +499,7 @@ cmp_checksum() {
 
     crc_only_in_producer=`comm -23 $producer_performance_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log`
 
-    duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log` 
+    duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log`
     no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \
                           + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \
                           2*$duplicate_msg_in_producer ))
@@ -521,19 +521,19 @@ cmp_checksum() {
     echo ""
 
     echo "========================================================" >> $checksum_diff_log
-    echo "crc only in producer"                                     >> $checksum_diff_log 
+    echo "crc only in producer"                                     >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
-    echo "${crc_only_in_producer}"                                  >> $checksum_diff_log 
+    echo "${crc_only_in_producer}"                                  >> $checksum_diff_log
     echo ""                                                         >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
-    echo "crc only in source consumer"                              >> $checksum_diff_log 
+    echo "crc only in source consumer"                              >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
-    echo "${crc_only_in_source_consumer}"                           >> $checksum_diff_log 
+    echo "${crc_only_in_source_consumer}"                           >> $checksum_diff_log
     echo ""                                                         >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
     echo "crc only in mirror consumer"                              >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
-    echo "${crc_only_in_mirror_consumer}"                           >> $checksum_diff_log   
+    echo "${crc_only_in_mirror_consumer}"                           >> $checksum_diff_log
     echo ""                                                         >> $checksum_diff_log
     echo "========================================================" >> $checksum_diff_log
     echo "duplicate crc in mirror consumer"                         >> $checksum_diff_log
@@ -583,8 +583,8 @@ start_test() {
 
     info "Started background producer pid [${background_producer_pid}]"
     sleep 5
-   
-    # loop for no. of iterations specified in $num_iterations 
+
+    # loop for no. of iterations specified in $num_iterations
     while [ $num_iterations -ge $iter ]
     do
         # if $svr_to_bounce is '0', it means no bouncing

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/system_test/producer_perf/bin/run-compression-test.sh
----------------------------------------------------------------------
diff --git a/system_test/producer_perf/bin/run-compression-test.sh b/system_test/producer_perf/bin/run-compression-test.sh
index ea20f0d..5297d1f 100755
--- a/system_test/producer_perf/bin/run-compression-test.sh
+++ b/system_test/producer_perf/bin/run-compression-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,9 +28,9 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1
 
-echo "wait for data to be persisted" 
+echo "wait for data to be persisted"
 cur_offset="-1"
 quit=0
 while [ $quit -eq 0 ]
@@ -59,4 +59,3 @@ fi
 ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
 sleep 2
 ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/system_test/producer_perf/bin/run-test.sh
----------------------------------------------------------------------
diff --git a/system_test/producer_perf/bin/run-test.sh b/system_test/producer_perf/bin/run-test.sh
index bb60817..9a3b885 100755
--- a/system_test/producer_perf/bin/run-test.sh
+++ b/system_test/producer_perf/bin/run-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,9 +28,9 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>&
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async
+$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async
 
-echo "wait for data to be persisted" 
+echo "wait for data to be persisted"
 cur_offset="-1"
 quit=0
 while [ $quit -eq 0 ]
@@ -59,4 +59,3 @@ fi
 ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
 sleep 2
 ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 8cde3c4..6edd64a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -117,7 +117,7 @@ def generate_testcase_log_dirs(systemTestEnv, testcaseEnv):
         # create the role directory under dashboards
         dashboardsRoleDir = dashboardsPathName + "/" + role
         if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
-        
+
 
 def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
     anonLogger.info("================================================")
@@ -212,7 +212,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
     logger.debug("executing command [" + cmdStr + "]", extra=d)
     system_test_utils.sys_call(cmdStr)
 
- 
+
 def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
     testCaseBaseDir = testcaseEnv.testCaseBaseDir
 
@@ -432,9 +432,9 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
                             sys.exit(1)
 
                     addedCSVConfig = {}
-                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") 
-                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
-                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics")
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5"
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter"
                     addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
 
                     if brokerVersion == "0.7":
@@ -466,7 +466,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv
                     tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
-                
+
                 else:
                     logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)
 
@@ -495,7 +495,7 @@ def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv):
 def start_zookeepers(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts(
         clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
 
     for zkEntityId in zkEntityIdList:
@@ -534,7 +534,7 @@ def start_zookeepers(systemTestEnv, testcaseEnv):
 def start_brokers(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
         clusterEntityConfigDictList, "role", "broker", "entity_id")
 
     for brokerEntityId in brokerEntityIdList:
@@ -558,7 +558,7 @@ def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
         start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId)
     else:
         clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-        brokerEntityIdList          = system_test_utils.get_data_from_list_of_dicts( 
+        brokerEntityIdList          = system_test_utils.get_data_from_list_of_dicts(
                                       clusterEntityConfigDictList, "role", "mirror_maker", "entity_id")
 
         for brokerEntityId in brokerEntityIdList:
@@ -571,17 +571,17 @@ def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDic
 
     # keep track of broker related data in this dict such as broker id,
     # entity id and timestamp and return it to the caller function
-    shutdownBrokerDict = {} 
+    shutdownBrokerDict = {}
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
                              clusterEntityConfigDictList, "role", "broker", "entity_id")
 
     for brokerEntityId in brokerEntityIdList:
 
-        hostname   = system_test_utils.get_data_by_lookup_keyval( 
+        hostname   = system_test_utils.get_data_by_lookup_keyval(
                          clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
-        logFile    = system_test_utils.get_data_by_lookup_keyval( 
+        logFile    = system_test_utils.get_data_by_lookup_keyval(
                          testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
 
         logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
@@ -629,7 +629,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict
 
     # keep track of leader related data in this dict such as broker id,
     # entity id and timestamp and return it to the caller function
-    leaderDict = {} 
+    leaderDict = {}
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
     brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
@@ -754,7 +754,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                       "--whitelist=\".*\" >> ",
                       logPathName + "/" + logFile + " & echo pid:$! > ",
                       logPathName + "/entity_" + entityId + "_pid'"]
-        else:       
+        else:
             cmdList = ["ssh " + hostname,
                       "'JAVA_HOME=" + javaHome,
                       "JMX_PORT=" + jmxPort,
@@ -821,7 +821,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
         cmdList = ["ssh " + hostname,
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
-                   kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer",
+                   kafkaHome + "/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
                    "--consumer.config /tmp/consumer.properties",
@@ -866,9 +866,9 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
     for consumerConfig in consumerConfigList:
         host              = consumerConfig["hostname"]
         entityId          = consumerConfig["entity_id"]
-        jmxPort           = consumerConfig["jmx_port"] 
+        jmxPort           = consumerConfig["jmx_port"]
         role              = consumerConfig["role"]
-        clusterName       = consumerConfig["cluster_name"] 
+        clusterName       = consumerConfig["cluster_name"]
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home")
         jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port")
@@ -940,7 +940,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
-                   kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
+                   kafkaRunClassBin + " kafka.tools.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
                    "--consumer.config /tmp/consumer.properties",
@@ -986,8 +986,8 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
     for producerConfig in producerConfigList:
         host              = producerConfig["hostname"]
         entityId          = producerConfig["entity_id"]
-        jmxPort           = producerConfig["jmx_port"] 
-        role              = producerConfig["role"] 
+        jmxPort           = producerConfig["jmx_port"]
+        role              = producerConfig["role"]
 
         thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client))
         logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
@@ -1029,7 +1029,7 @@ def generate_topics_string(topicPrefix, numOfTopics):
 def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
     host              = producerConfig["hostname"]
     entityId          = producerConfig["entity_id"]
-    jmxPort           = producerConfig["jmx_port"] 
+    jmxPort           = producerConfig["jmx_port"]
     role              = producerConfig["role"]
     clusterName       = producerConfig["cluster_name"]
     kafkaHome         = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home")
@@ -1121,7 +1121,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "'JAVA_HOME=" + javaHome,
                        "JMX_PORT=" + jmxPort,
                        "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome,
-                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
+                       kafkaRunClassBin + " kafka.tools.ProducerPerformance",
                        "--broker-list " + brokerListStr,
                        "--initial-message-id " + str(initMsgId),
                        "--messages " + noMsgPerBatch,
@@ -1157,7 +1157,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "'JAVA_HOME=" + javaHome,
                        "JMX_PORT=" + jmxPort,
                        "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome,
-                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
+                       kafkaRunClassBin + " kafka.tools.ProducerPerformance",
                        "--brokerinfo " + brokerInfoStr,
                        "--initial-message-id " + str(initMsgId),
                        "--messages " + noMsgPerBatch,
@@ -1267,7 +1267,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv):
             testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome)
 
         for topic in topicsList:
-            logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) 
+            logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d)
             cmdList = ["ssh " + zkHost,
                        "'JAVA_HOME=" + javaHome,
                        createTopicBin,
@@ -1276,7 +1276,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv):
                        " --replication-factor "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
                        " --partitions " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
                        testcaseBaseDir + "/logs/create_source_cluster_topic.log'"]
-    
+
             cmdStr = " ".join(cmdList)
             logger.debug("executing command: [" + cmdStr + "]", extra=d)
             subproc = system_test_utils.sys_call_return_subproc(cmdStr)
@@ -1568,7 +1568,7 @@ def ps_grep_terminate_running_entity(systemTestEnv):
         cmdStr = " ".join(cmdList)
         logger.debug("executing command [" + cmdStr + "]", extra=d)
 
-        system_test_utils.sys_call(cmdStr) 
+        system_test_utils.sys_call(cmdStr)
 
 def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict):
     leaderEntityId = None
@@ -1625,7 +1625,7 @@ def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttribu
     if shutdownTimestamp > 0:
         leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp)
         logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
- 
+
     return leaderReElectionLatency
 
 
@@ -1661,7 +1661,7 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
                 break
             logger.debug("calling testcaseEnv.lock.release()", extra=d)
             testcaseEnv.lock.release()
-    
+
         testcaseEnv.producerHostParentPidDict.clear()
 
     for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items():
@@ -1696,8 +1696,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
         if onlyThisEntityId is None or entityId == onlyThisEntityId:
 
             host              = migrationToolConfig["hostname"]
-            jmxPort           = migrationToolConfig["jmx_port"] 
-            role              = migrationToolConfig["role"] 
+            jmxPort           = migrationToolConfig["jmx_port"]
+            role              = migrationToolConfig["role"]
             kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home")
             javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home")
             jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port")
@@ -1763,7 +1763,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
         producerEntityId = prodPerfCfg["entity_id"]
         topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
 
-        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
                                clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
 
         matchingConsumerEntityId = None
@@ -1777,7 +1777,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
         if matchingConsumerEntityId is None:
             break
 
-        msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
+        msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(
                                                   testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
                                                   + "/msg_checksum_missing_in_consumer.log"
         producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
@@ -1862,7 +1862,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName
         #        |- 00000000000000000020.log
         #        |- . . .
 
-        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
+        # loop through all topicPartition directories such as : test_1-0, test_1-1, ...
         for topicPartition in os.listdir(localLogSegmentPath):
             # found a topic-partition directory
             if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
@@ -1915,7 +1915,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName
     #   'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
     #   'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e']
     # }
-  
+
     for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items():
         tokens = brokerTopicPartitionKey.split(":")
         brokerKey      = tokens[0]
@@ -1941,7 +1941,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName
             logger.debug("merged log segment checksum in " + topicPartition + " matched", extra=d)
         else:
             logger.error("unexpected error in " + topicPartition, extra=d)
-            
+
     if failureCount == 0:
         validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "PASSED"
     else:
@@ -1954,8 +1954,8 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None
     for consumerConfig in consumerConfigList:
         host              = consumerConfig["hostname"]
         entityId          = consumerConfig["entity_id"]
-        jmxPort           = consumerConfig["jmx_port"] 
-        clusterName       = consumerConfig["cluster_name"] 
+        jmxPort           = consumerConfig["jmx_port"]
+        clusterName       = consumerConfig["cluster_name"]
         kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home")
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
@@ -2019,16 +2019,16 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None
                            "--no-wait-at-logend ",
                            " > " + outputFilePathName,
                            " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
-    
+
                 cmdStr = " ".join(cmdList)
-    
+
                 logger.debug("executing command: [" + cmdStr + "]", extra=d)
                 subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr)
                 # dummy for-loop to wait until the process is completed
                 for line in subproc_1.stdout.readlines():
-                    pass 
+                    pass
                 time.sleep(1)
-   
+
                 partitionId += 1
             replicaIndex += 1
 
@@ -2037,7 +2037,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv):
     logger.info("Querying Zookeeper for Controller info ...", extra=d)
 
     # keep track of controller data in this dict such as broker id & entity id
-    controllerDict = {} 
+    controllerDict = {}
 
     clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
     tcConfigsList      = testcaseEnv.testcaseConfigsList
@@ -2092,7 +2092,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source")
         logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
         localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir
 
-        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
+        # loop through all topicPartition directories such as : test_1-0, test_1-1, ...
         for topicPartition in sorted(os.listdir(localLogSegmentPath)):
             # found a topic-partition directory
             if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
@@ -2131,7 +2131,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source")
     #  u'3:test_2-0': '0',
     #  u'3:test_2-1': '0'}
 
-    # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition    
+    # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition
     for brokerTopicPartition in sorted(brokerLogStartOffsetDict.iterkeys()):
         topicPartition = brokerTopicPartition.split(':')[1]
 
@@ -2446,7 +2446,7 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
     logger.info("Querying Zookeeper for leader info ...", extra=d)
 
     # keep track of leader data in this dict such as broker id & entity id
-    leaderDict = {} 
+    leaderDict = {}
 
     clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
     tcConfigsList      = testcaseEnv.testcaseConfigsList
@@ -2495,7 +2495,6 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
     print leaderDict
     return leaderDict
 
-
 def write_consumer_properties(consumerProperties):
     import tempfile
     props_file_path = tempfile.gettempdir() + "/consumer.properties"


Mime
View raw message