kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2819; catch NoSuchElementException in ConsoleConsumer
Date Fri, 13 Nov 2015 05:56:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 e8f92d620 -> b06ef94e2


KAFKA-2819; catch NoSuchElementException in ConsoleConsumer

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #516 from guozhangwang/K2819


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b06ef94e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b06ef94e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b06ef94e

Branch: refs/heads/0.9.0
Commit: b06ef94e22f34783e917f42d1659fd33891fabf2
Parents: e8f92d6
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Nov 12 21:23:28 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Nov 12 21:56:05 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/common/StreamEndException.scala | 7 +++++++
 core/src/main/scala/kafka/consumer/BaseConsumer.scala     | 5 ++++-
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala     | 8 ++++++--
 3 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b06ef94e/core/src/main/scala/kafka/common/StreamEndException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/StreamEndException.scala b/core/src/main/scala/kafka/common/StreamEndException.scala
new file mode 100644
index 0000000..2d814f7
--- /dev/null
+++ b/core/src/main/scala/kafka/common/StreamEndException.scala
@@ -0,0 +1,7 @@
+package kafka.common
+
+/**
+ * An exception that indicates KafkaStream has ended.
+ */
+class StreamEndException() extends RuntimeException {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b06ef94e/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 52cd5fa..7d3d7bc 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -18,6 +18,7 @@
 package kafka.consumer
 
 import java.util.Properties
+import kafka.common.StreamEndException
 
 /**
  * A base consumer used to abstract both old and new consumer
@@ -75,7 +76,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends
B
   val iter = stream.iterator
 
   override def receive(): BaseConsumerRecord = {
-    // we do not need to check hasNext for KafkaStream iterator
+    if (!iter.hasNext())
+      throw new StreamEndException
+
     val messageAndMetadata = iter.next
     BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset,
messageAndMetadata.key, messageAndMetadata.message)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b06ef94e/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 6504925..a0c00b5 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -20,6 +20,7 @@ package kafka.tools
 import java.io.PrintStream
 import java.util.{Properties, Random}
 import joptsimple._
+import kafka.common.StreamEndException
 import kafka.consumer._
 import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
@@ -100,11 +101,14 @@ object ConsoleConsumer extends Logging {
       val msg: BaseConsumerRecord = try {
         consumer.receive()
       } catch {
-        case e: Throwable => {
+        case nse: StreamEndException =>
+          trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
+          // Consumer is already closed
+          return
+        case e: Throwable =>
           error("Error processing message, terminating consumer process: ", e)
           // Consumer will be closed
           return
-        }
       }
       try {
         formatter.writeTo(msg.key, msg.value, System.out)


Mime
View raw message