kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [2/2] git commit: KAFKA-1438 Migrate client tools out of perf; reviewed by Neha Narkhede
Date Thu, 05 Jun 2014 05:21:04 GMT
KAFKA-1438 Migrate client tools out of perf; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1363ed7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1363ed7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1363ed7c

Branch: refs/heads/trunk
Commit: 1363ed7c5268546e657d2d8d9e9e3aeac2df04d8
Parents: df449a2
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Wed Jun 4 22:20:50 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Jun 4 22:20:55 2014 -0700

----------------------------------------------------------------------
 bin/kafka-console-consumer.sh                   |   6 +-
 bin/kafka-console-producer.sh                   |   6 +-
 bin/kafka-consumer-perf-test.sh                 |   6 +-
 bin/kafka-producer-perf-test.sh                 |   6 +-
 bin/kafka-run-class.sh                          |  12 +-
 bin/kafka-simple-consumer-perf-test.sh          |   6 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala  | 284 ------------------
 .../scala/kafka/producer/ConsoleProducer.scala  | 299 ------------------
 .../main/scala/kafka/server/OffsetManager.scala |   5 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     | 284 ++++++++++++++++++
 .../scala/kafka/tools/ConsoleProducer.scala     | 300 +++++++++++++++++++
 .../scala/kafka/tools/ConsumerPerformance.scala | 199 ++++++++++++
 .../src/main/scala/kafka/tools/PerfConfig.scala |  60 ++++
 .../scala/kafka/tools/ProducerPerformance.scala | 286 ++++++++++++++++++
 .../kafka/tools/SimpleConsumerPerformance.scala | 163 ++++++++++
 perf/config/log4j.properties                    |  24 --
 .../scala/kafka/perf/ConsumerPerformance.scala  | 199 ------------
 perf/src/main/scala/kafka/perf/PerfConfig.scala |  60 ----
 .../scala/kafka/perf/ProducerPerformance.scala  | 286 ------------------
 .../kafka/perf/SimpleConsumerPerformance.scala  | 163 ----------
 system_test/broker_failure/bin/run-test.sh      |  40 +--
 .../producer_perf/bin/run-compression-test.sh   |   9 +-
 system_test/producer_perf/bin/run-test.sh       |   9 +-
 system_test/utils/kafka_system_test_utils.py    |  93 +++---
 24 files changed, 1385 insertions(+), 1420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-console-consumer.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh
index e410dde..07c90a9 100755
--- a/bin/kafka-console-consumer.sh
+++ b/bin/kafka-console-consumer.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
 
-exec $(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-console-producer.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-console-producer.sh b/bin/kafka-console-producer.sh
index cd8ce62..ccca66d 100755
--- a/bin/kafka-console-producer.sh
+++ b/bin/kafka-console-producer.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.producer.ConsoleProducer $@
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-consumer-perf-test.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh
index 4ed3ed9..ebc513a 100755
--- a/bin/kafka-consumer-perf-test.sh
+++ b/bin/kafka-consumer-perf-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-producer-perf-test.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh
index b4efc29..84ac949 100755
--- a/bin/kafka-producer-perf-test.sh
+++ b/bin/kafka-producer-perf-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index d2fc8c0..5d5021d 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,11 +41,6 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/perf/build/libs//kafka-perf_${SCALA_VERSION}*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
 for file in $base_dir/examples/build/libs//kafka-examples*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
@@ -155,6 +150,3 @@ if [ "x$DAEMON_MODE" = "xtrue" ]; then
 else
   exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
 fi
-
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/bin/kafka-simple-consumer-perf-test.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh
index 2d3e3d3..b1a5cfc 100755
--- a/bin/kafka-simple-consumer-perf-test.sh
+++ b/bin/kafka-simple-consumer-perf-test.sh
@@ -5,9 +5,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
 fi
 
-exec $(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
deleted file mode 100644
index 1a16c69..0000000
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
-import joptsimple._
-import java.util.Properties
-import java.util.Random
-import java.io.PrintStream
-import kafka.message._
-import kafka.serializer._
-import kafka.utils._
-import kafka.metrics.KafkaMetricsReporter
-
-
-/**
- * Consumer that dumps messages out to standard out.
- *
- */
-object ConsoleConsumer extends Logging {
-
-  def main(args: Array[String]) {
-    val parser = new OptionParser
-    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
-            .withRequiredArg
-            .describedAs("topic")
-            .ofType(classOf[String])
-    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
-            .withRequiredArg
-            .describedAs("whitelist")
-            .ofType(classOf[String])
-    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
-            .withRequiredArg
-            .describedAs("blacklist")
-            .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-            "Multiple URLS can be given to allow fail-over.")
-            .withRequiredArg
-            .describedAs("urls")
-            .ofType(classOf[String])
-
-    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
-            .withRequiredArg
-            .describedAs("config file")
-            .ofType(classOf[String])
-    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
-            .withRequiredArg
-            .describedAs("class")
-            .ofType(classOf[String])
-            .defaultsTo(classOf[DefaultMessageFormatter].getName)
-    val messageFormatterArgOpt = parser.accepts("property")
-            .withRequiredArg
-            .describedAs("prop")
-            .ofType(classOf[String])
-    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
-    val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
-            "start with the earliest message present in the log rather than the latest message.")
-    val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
-            .withRequiredArg
-            .describedAs("num_messages")
-            .ofType(classOf[java.lang.Integer])
-    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
-            "skip it instead of halt.")
-    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
-    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
-            "set, the csv metrics will be outputed here")
-      .withRequiredArg
-      .describedAs("metrics dictory")
-      .ofType(classOf[java.lang.String])
-
-    var groupIdPassed = true
-    val options: OptionSet = tryParse(parser, args)
-    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
-    if (topicOrFilterOpt.size != 1) {
-      error("Exactly one of whitelist/blacklist/topic is required.")
-      parser.printHelpOn(System.err)
-      System.exit(1)
-    }
-    val topicArg = options.valueOf(topicOrFilterOpt.head)
-    val filterSpec = if (options.has(blacklistOpt))
-      new Blacklist(topicArg)
-    else
-      new Whitelist(topicArg)
-
-    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
-    if (csvMetricsReporterEnabled) {
-      val csvReporterProps = new Properties()
-      csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
-      csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
-      if (options.has(metricsDirectoryOpt))
-        csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
-      else
-        csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
-      csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
-      val verifiableProps = new VerifiableProperties(csvReporterProps)
-      KafkaMetricsReporter.startReporters(verifiableProps)
-    }
-
-
-
-    val consumerProps = if (options.has(consumerConfigOpt))
-      Utils.loadProps(options.valueOf(consumerConfigOpt))
-    else
-      new Properties()
-
-    if(!consumerProps.containsKey("group.id")) {
-      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
-      groupIdPassed=false
-    }
-    consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
-    consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))
-    if(!consumerProps.containsKey("dual.commit.enabled"))
-      consumerProps.put("dual.commit.enabled","false")
-    if(!consumerProps.containsKey("offsets.storage"))
-      consumerProps.put("offsets.storage","zookeeper")
-
-    if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) &&
-       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) {
-      System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
-        +". Please use --delete-consumer-offsets to delete previous offsets metadata")
-      System.exit(1)
-    }
-
-    if(options.has(deleteConsumerOffsetsOpt))
-      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))
-
-    val config = new ConsumerConfig(consumerProps)
-    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
-    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
-    val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
-    val connector = Consumer.create(config)
-
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run() {
-        connector.shutdown()
-        // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-        if(!groupIdPassed)
-          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
-      }
-    })
-
-    var numMessages = 0L
-    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
-    formatter.init(formatterArgs)
-    try {
-      val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
-      val iter = if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
-
-      for(messageAndTopic <- iter) {
-        try {
-          formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
-          numMessages += 1
-        } catch {
-          case e: Throwable =>
-            if (skipMessageOnError)
-              error("Error processing message, skipping this message: ", e)
-            else
-              throw e
-        }
-        if(System.out.checkError()) {
-          // This means no one is listening to our output stream any more, time to shutdown
-          System.err.println("Unable to write to standard out, closing consumer.")
-          System.err.println("Consumed %d messages".format(numMessages))
-          formatter.close()
-          connector.shutdown()
-          System.exit(1)
-        }
-      }
-    } catch {
-      case e: Throwable => error("Error processing message, stopping consumer: ", e)
-    }
-    System.err.println("Consumed %d messages".format(numMessages))
-    System.out.flush()
-    formatter.close()
-    connector.shutdown()
-  }
-
-  def tryParse(parser: OptionParser, args: Array[String]) = {
-    try {
-      parser.parse(args : _*)
-    } catch {
-      case e: OptionException => {
-        Utils.croak(e.getMessage)
-        null
-      }
-    }
-  }
-
-  def checkZkPathExists(zkUrl: String, path: String): Boolean = {
-    try {
-      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
-      zk.exists(path)
-    } catch {
-      case _: Throwable => false
-    }
-  }
-}
-
-object MessageFormatter {
-  def tryParseFormatterArgs(args: Iterable[String]): Properties = {
-    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
-    if(!splits.forall(_.length == 2)) {
-      System.err.println("Invalid parser arguments: " + args.mkString(" "))
-      System.exit(1)
-    }
-    val props = new Properties
-    for(a <- splits)
-      props.put(a(0), a(1))
-    props
-  }
-}
-
-trait MessageFormatter {
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
-  def init(props: Properties) {}
-  def close() {}
-}
-
-class DefaultMessageFormatter extends MessageFormatter {
-  var printKey = false
-  var keySeparator = "\t".getBytes
-  var lineSeparator = "\n".getBytes
-
-  override def init(props: Properties) {
-    if(props.containsKey("print.key"))
-      printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
-    if(props.containsKey("key.separator"))
-      keySeparator = props.getProperty("key.separator").getBytes
-    if(props.containsKey("line.separator"))
-      lineSeparator = props.getProperty("line.separator").getBytes
-  }
-
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
-    if(printKey) {
-      output.write(if (key == null) "null".getBytes() else key)
-      output.write(keySeparator)
-    }
-    output.write(if (value == null) "null".getBytes() else value)
-    output.write(lineSeparator)
-  }
-}
-
-class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties) {}
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
-}
-
-class ChecksumMessageFormatter extends MessageFormatter {
-  private var topicStr: String = _
-
-  override def init(props: Properties) {
-    topicStr = props.getProperty("topic")
-    if (topicStr != null)
-      topicStr = topicStr + ":"
-    else
-      topicStr = ""
-  }
-
-  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
-    val chksum = new Message(value, key).checksum
-    output.println(topicStr + "checksum:" + chksum)
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
deleted file mode 100644
index a2af988..0000000
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ /dev/null
@@ -1,299 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.producer
-
-import kafka.common._
-import kafka.message._
-import kafka.serializer._
-import kafka.utils.CommandLineUtils
-
-import java.util.Properties
-import java.io._
-
-import joptsimple._
-
-object ConsoleProducer {
-
-  def main(args: Array[String]) { 
-
-    val config = new ProducerConfig(args)
-    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
-    val props = new Properties
-    props.put("topic", config.topic)
-    props.putAll(config.cmdLineProps)
-    reader.init(System.in, props)
-
-    try {
-        val producer =
-          if(config.useNewProducer) {
-            import org.apache.kafka.clients.producer.ProducerConfig
-
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
-            props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
-            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
-            props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
-            props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
-            props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
-            props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
-            props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
-            props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
-            if(config.queueEnqueueTimeoutMs != -1)
-              props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
-            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
-            props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
-
-            new NewShinyProducer(props)
-          } else {
-            props.put("metadata.broker.list", config.brokerList)
-            props.put("compression.codec", config.compressionCodec)
-            props.put("producer.type", if(config.sync) "sync" else "async")
-            props.put("batch.num.messages", config.batchSize.toString)
-            props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
-            props.put("retry.backoff.ms", config.retryBackoffMs.toString)
-            props.put("queue.buffering.max.ms", config.sendTimeout.toString)
-            props.put("queue.buffering.max.messages", config.queueSize.toString)
-            props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
-            props.put("request.required.acks", config.requestRequiredAcks.toString)
-            props.put("request.timeout.ms", config.requestTimeoutMs.toString)
-            props.put("key.serializer.class", config.keyEncoderClass)
-            props.put("serializer.class", config.valueEncoderClass)
-            props.put("send.buffer.bytes", config.socketBuffer.toString)
-            props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
-            props.put("client.id", "console-producer")
-
-            new OldProducer(props)
-          }
-
-        Runtime.getRuntime.addShutdownHook(new Thread() {
-          override def run() {
-            producer.close()
-          }
-        })
-
-        var message: KeyedMessage[Array[Byte], Array[Byte]] = null
-        do {
-          message = reader.readMessage()
-          if(message != null)
-            producer.send(message.topic, message.key, message.message)
-        } while(message != null)
-    } catch {
-      case e: Exception =>
-        e.printStackTrace
-        System.exit(1)
-    }
-    System.exit(0)
-  }
-
-  class ProducerConfig(args: Array[String]) {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-      .withRequiredArg
-      .describedAs("broker-list")
-      .ofType(classOf[String])
-    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
-                                                                  "If specified without value, than it defaults to 'gzip'")
-                                    .withOptionalArg()
-                                    .describedAs("compression-codec")
-                                    .ofType(classOf[String])
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(100)
-    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
-      " a message will queue awaiting suffient batch size. The value is given in ms.")
-      .withRequiredArg
-      .describedAs("timeout_ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(1000)
-    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
-      " messages will queue awaiting suffient batch size.")
-      .withRequiredArg
-      .describedAs("queue_size")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(10000)
-    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
-      .withRequiredArg
-      .describedAs("queue enqueuetimeout ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(Int.MaxValue)
-    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
-      .withRequiredArg
-      .describedAs("request required acks")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
-      .withRequiredArg
-      .describedAs("request timeout ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1500)
-    val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
-      "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.")
-      .withRequiredArg
-      .describedAs("metadata expiration interval")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(5*60*1000L)
-    val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
-      "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
-      .withRequiredArg
-      .describedAs("metadata fetch timeout")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(60*1000L)
-    val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
-      "The total memory used by the producer to buffer records waiting to be sent to the server.")
-      .withRequiredArg
-      .describedAs("total memory in bytes")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(32 * 1024 * 1024L)
-    val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
-      "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
-        "will attempt to optimistically group them together until this size is reached.")
-      .withRequiredArg
-      .describedAs("memory in bytes per partition")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(16 * 1024L)
-    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[DefaultEncoder].getName)
-    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
-      .withRequiredArg
-      .describedAs("encoder_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[DefaultEncoder].getName)
-    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
-      "By default each line is read as a separate message.")
-      .withRequiredArg
-      .describedAs("reader_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[LineMessageReader].getName)
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024*100)
-    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
-      "This allows custom configuration for a user-defined message reader.")
-      .withRequiredArg
-      .describedAs("prop")
-      .ofType(classOf[String])
-    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
-
-    val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerListOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    import scala.collection.JavaConversions._
-    val useNewProducer = options.has(useNewProducerOpt)
-    val topic = options.valueOf(topicOpt)
-    val brokerList = options.valueOf(brokerListOpt)
-    val sync = options.has(syncOpt)
-    val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
-    val compressionCodec = if (options.has(compressionCodecOpt))
-                             if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty)
-                               DefaultCompressionCodec.name
-                             else compressionCodecOptionValue
-                           else NoCompressionCodec.name
-    val batchSize = options.valueOf(batchSizeOpt)
-    val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val queueSize = options.valueOf(queueSizeOpt)
-    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
-    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
-    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
-    val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
-    val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
-    val keyEncoderClass = options.valueOf(keyEncoderOpt)
-    val valueEncoderClass = options.valueOf(valueEncoderOpt)
-    val readerClass = options.valueOf(messageReaderOpt)
-    val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
-    /* new producer related configs */
-    val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
-    val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
-    val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
-    val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
-  }
-
-  trait MessageReader {
-    def init(inputStream: InputStream, props: Properties) {}
-    def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
-    def close() {}
-  }
-
-  class LineMessageReader extends MessageReader {
-    var topic: String = null
-    var reader: BufferedReader = null
-    var parseKey = false
-    var keySeparator = "\t"
-    var ignoreError = false
-    var lineNumber = 0
-
-    override def init(inputStream: InputStream, props: Properties) {
-      topic = props.getProperty("topic")
-      if(props.containsKey("parse.key"))
-        parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
-      if(props.containsKey("key.separator"))
-        keySeparator = props.getProperty("key.separator")
-      if(props.containsKey("ignore.error"))
-        ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
-      reader = new BufferedReader(new InputStreamReader(inputStream))
-    }
-
-    override def readMessage() = {
-      lineNumber += 1
-      (reader.readLine(), parseKey) match {
-        case (null, _) => null
-        case (line, true) =>
-          line.indexOf(keySeparator) match {
-            case -1 =>
-              if(ignoreError)
-                new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
-              else
-                throw new KafkaException("No key found on line " + lineNumber + ": " + line)
-            case n =>
-              new KeyedMessage[Array[Byte], Array[Byte]](topic,
-                             line.substring(0, n).getBytes,
-                             (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes())
-          }
-        case (line, false) =>
-          new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 5417628..0e22897 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -30,7 +30,7 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import scala.Some
 import kafka.common.TopicAndPartition
-import kafka.consumer.MessageFormatter
+import kafka.tools.MessageFormatter
 import java.io.PrintStream
 import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
 import org.apache.kafka.common.protocol.types.Type.STRING
@@ -247,7 +247,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
   def loadOffsetsFromLog(offsetsPartition: Int) {
-    
+
     val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
 
     loadingPartitions synchronized {
@@ -477,4 +477,3 @@ case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition)
     "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
 
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
new file mode 100644
index 0000000..f6bc2f1
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import scala.collection.JavaConversions._
+import org.I0Itec.zkclient._
+import joptsimple._
+import java.util.Properties
+import java.util.Random
+import java.io.PrintStream
+import kafka.message._
+import kafka.serializer._
+import kafka.utils._
+import kafka.metrics.KafkaMetricsReporter
+import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer}
+
+/**
+ * Consumer that dumps messages out to standard out.
+ *
+ */
+object ConsoleConsumer extends Logging {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
+            .withRequiredArg
+            .describedAs("topic")
+            .ofType(classOf[String])
+    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+            .withRequiredArg
+            .describedAs("whitelist")
+            .ofType(classOf[String])
+    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
+            .withRequiredArg
+            .describedAs("blacklist")
+            .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
+
+    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
+            .withRequiredArg
+            .describedAs("config file")
+            .ofType(classOf[String])
+    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
+            .withRequiredArg
+            .describedAs("class")
+            .ofType(classOf[String])
+            .defaultsTo(classOf[DefaultMessageFormatter].getName)
+    val messageFormatterArgOpt = parser.accepts("property")
+            .withRequiredArg
+            .describedAs("prop")
+            .ofType(classOf[String])
+    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
+    val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
+            "start with the earliest message present in the log rather than the latest message.")
+    val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
+            .withRequiredArg
+            .describedAs("num_messages")
+            .ofType(classOf[java.lang.Integer])
+    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
+            "skip it instead of halt.")
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+            "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
+
+    var groupIdPassed = true
+    val options: OptionSet = tryParse(parser, args)
+    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
+    if (topicOrFilterOpt.size != 1) {
+      error("Exactly one of whitelist/blacklist/topic is required.")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    val topicArg = options.valueOf(topicOrFilterOpt.head)
+    val filterSpec = if (options.has(blacklistOpt))
+      new Blacklist(topicArg)
+    else
+      new Whitelist(topicArg)
+
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+    if (csvMetricsReporterEnabled) {
+      val csvReporterProps = new Properties()
+      csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
+      csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
+      csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(csvReporterProps)
+      KafkaMetricsReporter.startReporters(verifiableProps)
+    }
+
+
+
+    val consumerProps = if (options.has(consumerConfigOpt))
+      Utils.loadProps(options.valueOf(consumerConfigOpt))
+    else
+      new Properties()
+
+    if(!consumerProps.containsKey("group.id")) {
+      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
+      groupIdPassed=false
+    }
+    consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+    consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+    if(!consumerProps.containsKey("dual.commit.enabled"))
+      consumerProps.put("dual.commit.enabled","false")
+    if(!consumerProps.containsKey("offsets.storage"))
+      consumerProps.put("offsets.storage","zookeeper")
+
+    if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) &&
+       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) {
+      System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
+        +". Please use --delete-consumer-offsets to delete previous offsets metadata")
+      System.exit(1)
+    }
+
+    if(options.has(deleteConsumerOffsetsOpt))
+      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))
+
+    val config = new ConsumerConfig(consumerProps)
+    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
+    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
+    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+    val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
+    val connector = Consumer.create(config)
+
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        connector.shutdown()
+        // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
+        if(!groupIdPassed)
+          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
+      }
+    })
+
+    var numMessages = 0L
+    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    formatter.init(formatterArgs)
+    try {
+      val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
+      val iter = if(maxMessages >= 0)
+        stream.slice(0, maxMessages)
+      else
+        stream
+
+      for(messageAndTopic <- iter) {
+        try {
+          formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
+          numMessages += 1
+        } catch {
+          case e: Throwable =>
+            if (skipMessageOnError)
+              error("Error processing message, skipping this message: ", e)
+            else
+              throw e
+        }
+        if(System.out.checkError()) {
+          // This means no one is listening to our output stream any more, time to shutdown
+          System.err.println("Unable to write to standard out, closing consumer.")
+          System.err.println("Consumed %d messages".format(numMessages))
+          formatter.close()
+          connector.shutdown()
+          System.exit(1)
+        }
+      }
+    } catch {
+      case e: Throwable => error("Error processing message, stopping consumer: ", e)
+    }
+    System.err.println("Consumed %d messages".format(numMessages))
+    System.out.flush()
+    formatter.close()
+    connector.shutdown()
+  }
+
+  def tryParse(parser: OptionParser, args: Array[String]) = {
+    try {
+      parser.parse(args : _*)
+    } catch {
+      case e: OptionException => {
+        Utils.croak(e.getMessage)
+        null
+      }
+    }
+  }
+
+  def checkZkPathExists(zkUrl: String, path: String): Boolean = {
+    try {
+      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
+      zk.exists(path)
+    } catch {
+      case _: Throwable => false
+    }
+  }
+}
+
+object MessageFormatter {
+  def tryParseFormatterArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if(!splits.forall(_.length == 2)) {
+      System.err.println("Invalid parser arguments: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for(a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+}
+
+trait MessageFormatter {
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
+  def init(props: Properties) {}
+  def close() {}
+}
+
+class DefaultMessageFormatter extends MessageFormatter {
+  var printKey = false
+  var keySeparator = "\t".getBytes
+  var lineSeparator = "\n".getBytes
+
+  override def init(props: Properties) {
+    if(props.containsKey("print.key"))
+      printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
+    if(props.containsKey("key.separator"))
+      keySeparator = props.getProperty("key.separator").getBytes
+    if(props.containsKey("line.separator"))
+      lineSeparator = props.getProperty("line.separator").getBytes
+  }
+
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    if(printKey) {
+      output.write(if (key == null) "null".getBytes() else key)
+      output.write(keySeparator)
+    }
+    output.write(if (value == null) "null".getBytes() else value)
+    output.write(lineSeparator)
+  }
+}
+
+class NoOpMessageFormatter extends MessageFormatter {
+  override def init(props: Properties) {}
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+}
+
+class ChecksumMessageFormatter extends MessageFormatter {
+  private var topicStr: String = _
+
+  override def init(props: Properties) {
+    topicStr = props.getProperty("topic")
+    if (topicStr != null)
+      topicStr = topicStr + ":"
+    else
+      topicStr = ""
+  }
+
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    val chksum = new Message(value, key).checksum
+    output.println(topicStr + "checksum:" + chksum)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
new file mode 100644
index 0000000..f4e07d4
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import kafka.common._
+import kafka.message._
+import kafka.serializer._
+import kafka.utils.CommandLineUtils
+import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage}
+
+import java.util.Properties
+import java.io._
+
+import joptsimple._
+
+object ConsoleProducer {
+
+  def main(args: Array[String]) {
+
+    val config = new ProducerConfig(args)
+    val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
+    val props = new Properties
+    props.put("topic", config.topic)
+    props.putAll(config.cmdLineProps)
+    reader.init(System.in, props)
+
+    try {
+        val producer =
+          if(config.useNewProducer) {
+            import org.apache.kafka.clients.producer.ProducerConfig
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
+            props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
+            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
+            props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
+            props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
+            props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
+            props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
+            props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
+            props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
+            if(config.queueEnqueueTimeoutMs != -1)
+              props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
+            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
+            props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+
+            new NewShinyProducer(props)
+          } else {
+            props.put("metadata.broker.list", config.brokerList)
+            props.put("compression.codec", config.compressionCodec)
+            props.put("producer.type", if(config.sync) "sync" else "async")
+            props.put("batch.num.messages", config.batchSize.toString)
+            props.put("message.send.max.retries", config.messageSendMaxRetries.toString)
+            props.put("retry.backoff.ms", config.retryBackoffMs.toString)
+            props.put("queue.buffering.max.ms", config.sendTimeout.toString)
+            props.put("queue.buffering.max.messages", config.queueSize.toString)
+            props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString)
+            props.put("request.required.acks", config.requestRequiredAcks.toString)
+            props.put("request.timeout.ms", config.requestTimeoutMs.toString)
+            props.put("key.serializer.class", config.keyEncoderClass)
+            props.put("serializer.class", config.valueEncoderClass)
+            props.put("send.buffer.bytes", config.socketBuffer.toString)
+            props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString)
+            props.put("client.id", "console-producer")
+
+            new OldProducer(props)
+          }
+
+        Runtime.getRuntime.addShutdownHook(new Thread() {
+          override def run() {
+            producer.close()
+          }
+        })
+
+        var message: KeyedMessage[Array[Byte], Array[Byte]] = null
+        do {
+          message = reader.readMessage()
+          if(message != null)
+            producer.send(message.topic, message.key, message.message)
+        } while(message != null)
+    } catch {
+      case e: Exception =>
+        e.printStackTrace
+        System.exit(1)
+    }
+    System.exit(0)
+  }
+
+  class ProducerConfig(args: Array[String]) {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+      .withRequiredArg
+      .describedAs("broker-list")
+      .ofType(classOf[String])
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
+                                                                  "If specified without value, than it defaults to 'gzip'")
+                                    .withOptionalArg()
+                                    .describedAs("compression-codec")
+                                    .ofType(classOf[String])
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(100)
+    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+      " a message will queue awaiting suffient batch size. The value is given in ms.")
+      .withRequiredArg
+      .describedAs("timeout_ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(1000)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
+      " messages will queue awaiting suffient batch size.")
+      .withRequiredArg
+      .describedAs("queue_size")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(10000)
+    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+      .withRequiredArg
+      .describedAs("queue enqueuetimeout ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Int.MaxValue)
+    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+      .withRequiredArg
+      .describedAs("request required acks")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+      .withRequiredArg
+      .describedAs("request timeout ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1500)
+    val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+      "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.")
+      .withRequiredArg
+      .describedAs("metadata expiration interval")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(5*60*1000L)
+    val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
+      "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
+      .withRequiredArg
+      .describedAs("metadata fetch timeout")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(60*1000L)
+    val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+      "The total memory used by the producer to buffer records waiting to be sent to the server.")
+      .withRequiredArg
+      .describedAs("total memory in bytes")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(32 * 1024 * 1024L)
+    val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
+      "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
+        "will attempt to optimistically group them together until this size is reached.")
+      .withRequiredArg
+      .describedAs("memory in bytes per partition")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(16 * 1024L)
+    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[DefaultEncoder].getName)
+    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[DefaultEncoder].getName)
+    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+      "By default each line is read as a separate message.")
+      .withRequiredArg
+      .describedAs("reader_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[LineMessageReader].getName)
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024*100)
+    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
+      "This allows custom configuration for a user-defined message reader.")
+      .withRequiredArg
+      .describedAs("prop")
+      .ofType(classOf[String])
+    val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
+
+    val options = parser.parse(args : _*)
+    for(arg <- List(topicOpt, brokerListOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    import scala.collection.JavaConversions._
+    val useNewProducer = options.has(useNewProducerOpt)
+    val topic = options.valueOf(topicOpt)
+    val brokerList = options.valueOf(brokerListOpt)
+    val sync = options.has(syncOpt)
+    val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
+    val compressionCodec = if (options.has(compressionCodecOpt))
+                             if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty)
+                               DefaultCompressionCodec.name
+                             else compressionCodecOptionValue
+                           else NoCompressionCodec.name
+    val batchSize = options.valueOf(batchSizeOpt)
+    val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val queueSize = options.valueOf(queueSizeOpt)
+    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
+    val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
+    val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
+    val keyEncoderClass = options.valueOf(keyEncoderOpt)
+    val valueEncoderClass = options.valueOf(valueEncoderOpt)
+    val readerClass = options.valueOf(messageReaderOpt)
+    val socketBuffer = options.valueOf(socketBufferSizeOpt)
+    val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
+    /* new producer related configs */
+    val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
+    val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
+    val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
+    val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
+  }
+
+  trait MessageReader {
+    def init(inputStream: InputStream, props: Properties) {}
+    def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
+    def close() {}
+  }
+
+  class LineMessageReader extends MessageReader {
+    var topic: String = null
+    var reader: BufferedReader = null
+    var parseKey = false
+    var keySeparator = "\t"
+    var ignoreError = false
+    var lineNumber = 0
+
+    override def init(inputStream: InputStream, props: Properties) {
+      topic = props.getProperty("topic")
+      if(props.containsKey("parse.key"))
+        parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
+      if(props.containsKey("key.separator"))
+        keySeparator = props.getProperty("key.separator")
+      if(props.containsKey("ignore.error"))
+        ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
+      reader = new BufferedReader(new InputStreamReader(inputStream))
+    }
+
+    override def readMessage() = {
+      lineNumber += 1
+      (reader.readLine(), parseKey) match {
+        case (null, _) => null
+        case (line, true) =>
+          line.indexOf(keySeparator) match {
+            case -1 =>
+              if(ignoreError)
+                new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
+              else
+                throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+            case n =>
+              new KeyedMessage[Array[Byte], Array[Byte]](topic,
+                             line.substring(0, n).getBytes,
+                             (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes())
+          }
+        case (line, false) =>
+          new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
new file mode 100644
index 0000000..4688349
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicLong
+import java.nio.channels.ClosedByInterruptException
+import org.apache.log4j.Logger
+import kafka.message.Message
+import kafka.utils.ZkUtils
+import java.util.{ Random, Properties }
+import kafka.consumer._
+import java.text.SimpleDateFormat
+
+/**
+ * Performance test for the full zookeeper consumer
+ */
+object ConsumerPerformance {
+  private val logger = Logger.getLogger(getClass())
+
+  def main(args: Array[String]): Unit = {
+
+    val config = new ConsumerPerfConfig(args)
+    logger.info("Starting consumer...")
+    var totalMessagesRead = new AtomicLong(0)
+    var totalBytesRead = new AtomicLong(0)
+
+    if (!config.hideHeader) {
+      if (!config.showDetailedStats)
+        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+      else
+        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+    }
+
+    // clean up zookeeper state for this group id for every perf run
+    ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId)
+
+    val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
+
+    val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
+    var threadList = List[ConsumerPerfThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
+          totalMessagesRead, totalBytesRead)
+
+    logger.info("Sleeping for 1 second.")
+    Thread.sleep(1000)
+    logger.info("starting threads")
+    val startMs = System.currentTimeMillis
+    for (thread <- threadList)
+      thread.start
+
+    for (thread <- threadList)
+      thread.join
+
+    val endMs = System.currentTimeMillis
+    val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
+    if (!config.showDetailedStats) {
+      val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
+      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+        config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get,
+        totalMessagesRead.get / elapsedSecs))
+    }
+    System.exit(0)
+  }
+
+  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+      "Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+      .withRequiredArg
+      .describedAs("gid")
+      .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
+      .ofType(classOf[String])
+    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024 * 1024)
+    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
+      "offset to consume from, start with the latest message present in the log rather than the earliest message.")
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(2 * 1024 * 1024)
+    val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(10)
+    val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val options = parser.parse(args: _*)
+
+    for (arg <- List(topicOpt, zkConnectOpt)) {
+      if (!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val props = new Properties
+    props.put("group.id", options.valueOf(groupIdOpt))
+    props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
+    props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
+    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+    props.put("consumer.timeout.ms", "5000")
+    props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
+    val consumerConfig = new ConsumerConfig(props)
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val topic = options.valueOf(topicOpt)
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val showDetailedStats = options.has(showDetailedStatsOpt)
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+  }
+
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
+    config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
+    extends Thread(name) {
+
+    override def run() {
+      var bytesRead = 0L
+      var messagesRead = 0L
+      val startMs = System.currentTimeMillis
+      var lastReportTime: Long = startMs
+      var lastBytesRead = 0L
+      var lastMessagesRead = 0L
+
+      try {
+        val iter = stream.iterator
+        while (iter.hasNext && messagesRead < config.numMessages) {
+          val messageAndMetadata = iter.next
+          messagesRead += 1
+          bytesRead += messageAndMetadata.message.length
+
+          if (messagesRead % config.reportingInterval == 0) {
+            if (config.showDetailedStats)
+              printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
+            lastReportTime = System.currentTimeMillis
+            lastMessagesRead = messagesRead
+            lastBytesRead = bytesRead
+          }
+        }
+      } catch {
+        case _: InterruptedException =>
+        case _: ClosedByInterruptException =>
+        case _: ConsumerTimeoutException =>
+        case e: Throwable => e.printStackTrace()
+      }
+      totalMessagesRead.addAndGet(messagesRead)
+      totalBytesRead.addAndGet(bytesRead)
+      if (config.showDetailedStats)
+        printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
+    }
+
+    private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
+      startMs: Long, endMs: Long) = {
+      val elapsedMs = endMs - startMs
+      val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
+      val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
+      println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
+        config.consumerConfig.fetchMessageMaxBytes, totalMBRead,
+        1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1363ed7c/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
new file mode 100644
index 0000000..129cc01
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.tools
+
+import joptsimple.OptionParser
+
+
+class PerfConfig(args: Array[String]) {
+  val parser = new OptionParser
+  val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume")
+    .withRequiredArg
+    .describedAs("count")
+    .ofType(classOf[java.lang.Long])
+    .defaultsTo(Long.MaxValue)
+  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(5000)
+  val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
+    "See java.text.SimpleDateFormat for options.")
+    .withRequiredArg
+    .describedAs("date format")
+    .ofType(classOf[String])
+    .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
+  val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
+    "interval as configured by reporting-interval")
+  val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ")
+  val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(100)
+  val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(200)
+  val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
+    .withRequiredArg
+    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(0)
+  val helpOpt = parser.accepts("help", "Print usage.")
+}


Mime
View raw message