kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2467: Fix changes to behavior in ConsoleConsumer: properly parse consumer.config option, handle exceptions during message processing, and print number of processed messages to stderr.
Date Thu, 27 Aug 2015 01:36:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 35eaef7bb -> 8c88d198a


KAFKA-2467: Fix changes to behavior in ConsoleConsumer: properly parse consumer.config option,
handle exceptions during message processing, and print number of processed messages to stderr.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Guozhang Wang

Closes #166 from ewencp/kafka-2467-fix-console-consumer-behavior-regressions and squashes
the following commits:

8b5e30c [Ewen Cheslack-Postava] KAFKA-2466: Fix ConsoleConsumer exit process for new consumer
to avoid ConcurrentModificationException.
c6abe38 [Ewen Cheslack-Postava] Fix missing parameter in test.
a6961ee [Ewen Cheslack-Postava] KAFKA-2467: Fix changes to behavior in ConsoleConsumer: properly
parse consumer.config option, handle exceptions during message processing, and print number
of processed messages to stderr.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c88d198
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c88d198
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c88d198

Branch: refs/heads/trunk
Commit: 8c88d198a0bcd0292a6b7ec3b08ae138e41fcc6e
Parents: 35eaef7
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Wed Aug 26 18:38:43 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 26 18:38:43 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     | 15 ++++--
 .../scala/kafka/tools/ConsoleConsumer.scala     | 54 +++++++++++++++-----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 22 +++++++-
 3 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c88d198/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 95325b0..4e956bb 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -26,7 +26,8 @@ import java.util.Properties
  */
 trait BaseConsumer {
   def receive(): BaseConsumerRecord
-  def close()
+  def stop()
+  def cleanup()
 }
 
 case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte],
value: Array[Byte])
@@ -47,7 +48,11 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends
BaseCon
     BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
   }
 
-  override def close() {
+  override def stop() {
+    this.consumer.wakeup()
+  }
+
+  override def cleanup() {
     this.consumer.close()
   }
 }
@@ -66,7 +71,11 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties)
extends B
     BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset,
messageAndMetadata.key, messageAndMetadata.message)
   }
 
-  override def close() {
+  override def stop() {
+    this.consumerConnector.shutdown()
+  }
+
+  override def cleanup() {
     this.consumerConnector.shutdown()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c88d198/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 22c63b3..a9c5427 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -34,6 +34,8 @@ import scala.collection.JavaConversions._
  */
 object ConsoleConsumer extends Logging {
 
+  var messageCount = 0
+
   def main(args: Array[String]) {
     val conf = new ConsumerConfig(args)
     run(conf)
@@ -51,7 +53,16 @@ object ConsoleConsumer extends Logging {
 
     addShutdownHook(consumer, conf)
 
-    process(conf.maxMessages, conf.formatter, consumer)
+    try {
+      process(conf.maxMessages, conf.formatter, consumer, conf.skipMessageOnError)
+    } finally {
+      consumer.cleanup()
+      reportRecordCount()
+
+      // if we generated a random group id (as none specified explicitly) then avoid polluting
zookeeper with persistent group data, this is a hack
+      if (!conf.groupIdPassed)
+        ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" +
conf.consumerProps.get("group.id"))
+    }
   }
 
   def checkZk(config: ConsumerConfig) {
@@ -71,24 +82,40 @@ object ConsoleConsumer extends Logging {
   def addShutdownHook(consumer: BaseConsumer, conf: ConsumerConfig) {
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
-        consumer.close()
-
-        // if we generated a random group id (as none specified explicitly) then avoid polluting
zookeeper with persistent group data, this is a hack
-        if (!conf.groupIdPassed)
-          ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/"
+ conf.consumerProps.get("group.id"))
+        consumer.stop()
       }
     })
   }
 
-  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer)
{
-    var messageCount = 0
+  def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer,
skipMessageOnError: Boolean) {
     while (messageCount < maxMessages || maxMessages == -1) {
       messageCount += 1
-      val msg: BaseConsumerRecord = consumer.receive()
-      formatter.writeTo(msg.key, msg.value, System.out)
+      val msg: BaseConsumerRecord = try {
+        consumer.receive()
+      } catch {
+        case e: Throwable => {
+          error("Error processing message, stopping consumer: ", e)
+          consumer.stop()
+          return
+        }
+      }
+      try {
+        formatter.writeTo(msg.key, msg.value, System.out)
+      } catch {
+        case e: Throwable =>
+          if (skipMessageOnError) {
+            error("Error processing message, skipping this message: ", e)
+          } else {
+            consumer.stop()
+            throw e
+          }
+      }
       checkErr(formatter)
     }
-    println(s"Processed a total of $messageCount messages")
+  }
+
+  def reportRecordCount() {
+    System.err.println(s"Processed a total of $messageCount messages")
   }
 
   def checkErr(formatter: MessageFormatter) {
@@ -203,7 +230,10 @@ object ConsoleConsumer extends Logging {
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
     val topicArg = options.valueOf(topicOrFilterOpt.head)
     val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
-    val consumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerConfigOpt))
+    val consumerProps = if (options.has(consumerConfigOpt))
+      Utils.loadProps(options.valueOf(consumerConfigOpt))
+    else
+      new Properties()
     val zkConnectionStr = options.valueOf(zkConnectOpt)
     val fromBeginning = options.has(resetBeginningOpt)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c88d198/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 254ba7b..7c08e09 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,8 +17,11 @@
 
 package unit.kafka.tools
 
+import java.io.FileOutputStream
+
 import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
 import kafka.tools.{ConsoleConsumer, MessageFormatter}
+import kafka.utils.TestUtils
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
@@ -44,7 +47,7 @@ class ConsoleConsumerTest extends JUnitSuite {
     EasyMock.replay(formatter)
 
     //Test
-    ConsoleConsumer.process(messageLimit, formatter, consumer)
+    ConsoleConsumer.process(messageLimit, formatter, consumer, true)
   }
 
   @Test
@@ -84,4 +87,21 @@ class ConsoleConsumerTest extends JUnitSuite {
     assertEquals(true, config.fromBeginning)
   }
 
+
+  @Test
+  def shouldParseConfigsFromFile() {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = new FileOutputStream(propsFile)
+    propsStream.write("consumer.timeout.ms=1000".getBytes())
+    propsStream.close()
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    assertEquals("1000", config.consumerProps.getProperty("consumer.timeout.ms"))
+  }
 }


Mime
View raw message