kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2015: Enable ConsoleConsumer to use new consumer
Date Thu, 20 Aug 2015 19:59:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e4fc456ab -> 767a8a762


KAFKA-2015: Enable ConsoleConsumer to use new consumer

This extends the original patch done by GZ to provide Console access to both the new and old
consumer API's. The code follows a pattern similar to that already used in ConsoleProducer.

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Jason Gustafson, Ismael Juma, Guozhang Wang

Closes #144 from benstopford/KAFKA-2015 and squashes the following commits:

5058a7b [Ben Stopford] Patch for KAFKA-2015: removed unused imports
6e08bf4 [Ben Stopford] Patch for KAFKA-2015: fixed formatting error
739457c [Ben Stopford] Patch for KAFKA-2015: switched to blocking poll + typo + fixed to match
style guide
883a626 [Ben Stopford] Patch for KAFKA-2015: incorporating comments to date.
0660629 [Ben Stopford] Merge remote-tracking branch 'upstream/trunk' into KAFKA-2015
e102ff6 [Ben Stopford] KAFKA-2015 - ported patch from jira in and altered to match exising
ConsoleProducer template


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/767a8a76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/767a8a76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/767a8a76

Branch: refs/heads/trunk
Commit: 767a8a7628bd936ee3da1f03c27639657cd0dd5e
Parents: e4fc456
Author: Ben Stopford <benstopford@gmail.com>
Authored: Thu Aug 20 13:00:58 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Aug 20 13:00:58 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     |  72 ++++
 .../scala/kafka/tools/ConsoleConsumer.scala     | 335 +++++++++++--------
 .../unit/kafka/tools/ConsoleConsumerTest.scala  |  87 +++++
 3 files changed, 350 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/767a8a76/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
new file mode 100644
index 0000000..5017c95
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.Properties
+
+/**
+ * A base consumer used to abstract both old and new consumer
+ * this class should be removed (along with BaseProducer) be removed
+ * once we deprecate old consumer
+ */
+trait BaseConsumer {
+  def receive(): BaseConsumerRecord
+  def close()
+}
+
+case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte],
value: Array[Byte])
+
+class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer {
+  import org.apache.kafka.clients.consumer.KafkaConsumer
+
+  val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
+  consumer.subscribe(topic)
+  var recordIter = consumer.poll(0).iterator
+
+  override def receive(): BaseConsumerRecord = {
+    while (!recordIter.hasNext)
+      recordIter = consumer.poll(Long.MaxValue).iterator
+
+    val record = recordIter.next
+    BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
+  }
+
+  override def close() {
+    this.consumer.close()
+  }
+}
+
+class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer
{
+  import kafka.serializer.DefaultDecoder
+
+  val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
+  val stream: KafkaStream[Array[Byte], Array[Byte]] =
+    consumerConnector.createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(),
new DefaultDecoder()).head
+  val iter = stream.iterator
+
+  override def receive(): BaseConsumerRecord = {
+    // we do not need to check hasNext for KafkaStream iterator
+    val messageAndMetadata = iter.next
+    BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset,
messageAndMetadata.key, messageAndMetadata.message)
+  }
+
+  override def close() {
+    this.consumerConnector.shutdown()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/767a8a76/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a3bee58..22c63b3 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,91 +17,211 @@
 
 package kafka.tools
 
-import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
-import joptsimple._
-import java.util.Properties
-import java.util.Random
 import java.io.PrintStream
+import java.util.{Properties, Random}
+import joptsimple._
+import kafka.consumer._
 import kafka.message._
-import kafka.serializer._
-import kafka.utils._
 import kafka.metrics.KafkaMetricsReporter
-import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer}
+import kafka.utils._
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConversions._
+
 /**
- * Consumer that dumps messages out to standard out.
- *
+ * Consumer that dumps messages to standard out.
  */
 object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
+    val conf = new ConsumerConfig(args)
+    run(conf)
+  }
+
+  def run(conf: ConsumerConfig) {
+
+    val consumer =
+      if (conf.useNewConsumer) {
+        new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf))
+      } else {
+        checkZk(conf)
+        new OldConsumer(conf.filterSpec, getOldConsumerProps(conf))
+      }
+
+    addShutdownHook(consumer, conf)
+
+    process(conf.maxMessages, conf.formatter, consumer)
+  }
+
+  def checkZk(config: ConsumerConfig) {
+    if (!checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/brokers/ids"))
{
+      System.err.println("No brokers found in ZK.")
+      System.exit(1)
+    }
+
+    if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt)
&&
+      checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id")
+ "/offsets")) {
+      System.err.println("Found previous offset information for this group " + config.consumerProps.getProperty("group.id")
+        + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
+      System.exit(1)
+    }
+  }
+
+  def addShutdownHook(consumer: BaseConsumer, conf: ConsumerConfig) {
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        consumer.close()
+
+        // if we generated a random group id (as none specified explicitly) then avoid polluting
zookeeper with persistent group data, this is a hack
+        if (!conf.groupIdPassed)
+          ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/"
+ conf.consumerProps.get("group.id"))
+      }
+    })
+  }
+
+  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer)
{
+    var messageCount = 0
+    while (messageCount < maxMessages || maxMessages == -1) {
+      messageCount += 1
+      val msg: BaseConsumerRecord = consumer.receive()
+      formatter.writeTo(msg.key, msg.value, System.out)
+      checkErr(formatter)
+    }
+    println(s"Processed a total of $messageCount messages")
+  }
+
+  def checkErr(formatter: MessageFormatter) {
+    if (System.out.checkError()) {
+      // This means no one is listening to our output stream any more, time to shutdown
+      System.err.println("Unable to write to standard out, closing consumer.")
+      formatter.close()
+      System.exit(1)
+    }
+  }
+
+  def getOldConsumerProps(config: ConsumerConfig): Properties = {
+    val props = new Properties
+
+    props.putAll(config.consumerProps)
+    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
+    props.put("zookeeper.connect", config.zkConnectionStr)
+
+    if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt)
&&
+      checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id")
+ "/offsets")) {
+      System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
+        + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
+      System.exit(1)
+    }
+
+    if (config.options.has(config.deleteConsumerOffsetsOpt))
+      ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/"
+ config.consumerProps.getProperty("group.id"))
+
+    props
+  }
+
+  def getNewConsumerProps(config: ConsumerConfig): Properties = {
+    val props = new Properties
+
+    props.putAll(config.consumerProps)
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt))
"earliest" else "latest")
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer !=
null) config.keyDeserializer else "org.apache.kafka.common.serialization.StringDeserializer")
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer
!= null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
+    props
+  }
+
+  class ConsumerConfig(args: Array[String]) {
     val parser = new OptionParser
     val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
-            .withRequiredArg
-            .describedAs("topic")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
-            .withRequiredArg
-            .describedAs("whitelist")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("whitelist")
+      .ofType(classOf[String])
     val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
-            .withRequiredArg
-            .describedAs("blacklist")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("blacklist")
+      .ofType(classOf[String])
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
-            "Multiple URLS can be given to allow fail-over.")
-            .withRequiredArg
-            .describedAs("urls")
-            .ofType(classOf[String])
-
+      "Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
     val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties
file.")
-            .withRequiredArg
-            .describedAs("config file")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
     val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for
formatting kafka messages for display.")
-            .withRequiredArg
-            .describedAs("class")
-            .ofType(classOf[String])
-            .defaultsTo(classOf[DefaultMessageFormatter].getName)
+      .withRequiredArg
+      .describedAs("class")
+      .ofType(classOf[String])
+      .defaultsTo(classOf[DefaultMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
-            .withRequiredArg
-            .describedAs("prop")
-            .ofType(classOf[String])
+      .withRequiredArg
+      .describedAs("prop")
+      .ofType(classOf[String])
     val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified,
the consumer path in zookeeper is deleted when starting up")
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already
have an established offset to consume from, " +
-            "start with the earliest message present in the log rather than the latest message.")
+      "start with the earliest message present in the log rather than the latest message.")
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to
consume before exiting. If not set, consumption is continual.")
-            .withRequiredArg
-            .describedAs("num_messages")
-            .ofType(classOf[java.lang.Integer])
+      .withRequiredArg
+      .describedAs("num_messages")
+      .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error
when processing a message, " +
-            "skip it instead of halt.")
+      "skip it instead of halt.")
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the
CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set,
and this parameter is" +
-            "set, the csv metrics will be outputed here")
+      "set, the csv metrics will be outputed here")
       .withRequiredArg
-      .describedAs("metrics dictory")
+      .describedAs("metrics directory")
       .ofType(classOf[java.lang.String])
+    val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
+    val bootstrapServerOpt = parser.accepts("bootstrap-server")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+    val keyDeserializerOpt = parser.accepts("key-deserializer")
+      .withRequiredArg
+      .describedAs("deserializer for key")
+      .ofType(classOf[String])
+    val valueDeserializerOpt = parser.accepts("value-deserializer")
+      .withRequiredArg
+      .describedAs("deserializer for values")
+      .ofType(classOf[String])
 
-    if(args.length == 0)
+    if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads
data from Kafka and outputs it to standard output.")
-      
+
     var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
-    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+    val useNewConsumer = options.has(useNewConsumerOpt)
+    val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
-    if (topicOrFilterOpt.size != 1)
-      CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
     val topicArg = options.valueOf(topicOrFilterOpt.head)
-    val filterSpec = if (options.has(blacklistOpt))
-      new Blacklist(topicArg)
-    else
-      new Whitelist(topicArg)
+    val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
+    val consumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerConfigOpt))
+    val zkConnectionStr = options.valueOf(zkConnectOpt)
+    val fromBeginning = options.has(resetBeginningOpt)
+    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
+    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
+    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
+    val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue
else -1
+    val bootstrapServer = options.valueOf(bootstrapServerOpt)
+    val keyDeserializer = options.valueOf(keyDeserializerOpt)
+    val valueDeserializer = options.valueOf(valueDeserializerOpt)
+    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    formatter.init(formatterArgs)
+
+    CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt
else zkConnectOpt)
+
+    if (!useNewConsumer && topicOrFilterOpt.size != 1)
+      CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic
is required.")
 
-    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
-    if (csvMetricsReporterEnabled) {
+    if (options.has(csvMetricsReporterEnabledOpt)) {
       val csvReporterProps = new Properties()
       csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
       csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
@@ -114,102 +234,26 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
-    val consumerProps = if (options.has(consumerConfigOpt))
-      Utils.loadProps(options.valueOf(consumerConfigOpt))
-    else
-      new Properties()
-
-    if(!consumerProps.containsKey("group.id")) {
-      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
+    //Provide the consumer with a randomly assigned group id
+    if(!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-" + new Random().nextInt(100000))
       groupIdPassed=false
     }
-    consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest"
else "largest")
-    consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))
-
-    if (!checkZkPathExists(options.valueOf(zkConnectOpt),"/brokers/ids")) {
-      System.err.println("No brokers found.")
-      System.exit(1)
-    }
-
-    if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt)
&&
-       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+
"/offsets")) {
-      System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
-        +". Please use --delete-consumer-offsets to delete previous offsets metadata")
-      System.exit(1)
-    }
-
-    if(options.has(deleteConsumerOffsetsOpt))
-      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))
-
-    val config = new ConsumerConfig(consumerProps)
-    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
-    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
-    val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue
else -1
-    val connector = Consumer.create(config)
-
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run() {
-        connector.shutdown()
-        // if there is no group specified then avoid polluting zookeeper with persistent
group data, this is a hack
-        if(!groupIdPassed)
-          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
-      }
-    })
-
-    var numMessages = 0L
-    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
-    formatter.init(formatterArgs)
-    try {
-      val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(),
new DefaultDecoder()).get(0)
-      val iter = if(maxMessages >= 0)
-        stream.slice(0, maxMessages)
-      else
-        stream
-
-      for(messageAndTopic <- iter) {
-        try {
-          formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
-          numMessages += 1
-        } catch {
-          case e: Throwable =>
-            if (skipMessageOnError)
-              error("Error processing message, skipping this message: ", e)
-            else
-              throw e
-        }
-        if(System.out.checkError()) {
-          // This means no one is listening to our output stream any more, time to shutdown
-          System.err.println("Unable to write to standard out, closing consumer.")
-          System.err.println("Consumed %d messages".format(numMessages))
-          formatter.close()
-          connector.shutdown()
-          System.exit(1)
-        }
-      }
-    } catch {
-      case e: Throwable => error("Error processing message, stopping consumer: ", e)
-    }
-    System.err.println("Consumed %d messages".format(numMessages))
-    System.out.flush()
-    formatter.close()
-    connector.shutdown()
-  }
 
-  def tryParse(parser: OptionParser, args: Array[String]) = {
-    try {
-      parser.parse(args : _*)
-    } catch {
-      case e: OptionException => {
-        Utils.croak(e.getMessage)
-        null
+    def tryParse(parser: OptionParser, args: Array[String]) = {
+      try
+        parser.parse(args: _*)
+      catch {
+        case e: OptionException =>
+          Utils.croak(e.getMessage)
+          null
       }
     }
   }
 
   def checkZkPathExists(zkUrl: String, path: String): Boolean = {
     try {
-      val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000);
+      val zk = ZkUtils.createZkClient(zkUrl, 30 * 1000, 30 * 1000)
       zk.exists(path)
     } catch {
       case _: Throwable => false
@@ -219,7 +263,9 @@ object ConsoleConsumer extends Logging {
 
 trait MessageFormatter {
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
+
   def init(props: Properties) {}
+
   def close() {}
 }
 
@@ -229,16 +275,16 @@ class DefaultMessageFormatter extends MessageFormatter {
   var lineSeparator = "\n".getBytes
 
   override def init(props: Properties) {
-    if(props.containsKey("print.key"))
+    if (props.containsKey("print.key"))
       printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
-    if(props.containsKey("key.separator"))
+    if (props.containsKey("key.separator"))
       keySeparator = props.getProperty("key.separator").getBytes
-    if(props.containsKey("line.separator"))
+    if (props.containsKey("line.separator"))
       lineSeparator = props.getProperty("line.separator").getBytes
   }
 
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
-    if(printKey) {
+    if (printKey) {
       output.write(if (key == null) "null".getBytes() else key)
       output.write(keySeparator)
     }
@@ -249,6 +295,7 @@ class DefaultMessageFormatter extends MessageFormatter {
 
 class NoOpMessageFormatter extends MessageFormatter {
   override def init(props: Properties) {}
+
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/767a8a76/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
new file mode 100644
index 0000000..254ba7b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.tools
+
+import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
+import kafka.tools.{ConsoleConsumer, MessageFormatter}
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class ConsoleConsumerTest extends JUnitSuite {
+
+  @Test
+  def shouldLimitReadsToMaxMessageLimit() {
+    //Mocks
+    val consumer = EasyMock.createNiceMock(classOf[BaseConsumer])
+    val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
+
+    //Stubs
+    val record = new BaseConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
+
+    //Expectations
+    val messageLimit: Int = 10
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
+    EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)
+
+    EasyMock.replay(consumer)
+    EasyMock.replay(formatter)
+
+    //Test
+    ConsoleConsumer.process(messageLimit, formatter, consumer)
+  }
+
+  @Test
+  def shouldParseValidOldConsumerValidConfig() {
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertFalse(config.useNewConsumer)
+    assertEquals("localhost:2181", config.zkConnectionStr)
+    assertEquals("test", config.topicArg)
+    assertEquals(true, config.fromBeginning)
+  }
+
+  @Test
+  def shouldParseValidNewConsumerValidConfig() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--from-beginning",
+      "--new-consumer") //new
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertTrue(config.useNewConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(true, config.fromBeginning)
+  }
+
+}


Mime
View raw message