kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: MINOR: fix shutdownHook in ConsoleConsumer
Date Wed, 18 Nov 2015 02:00:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 fda91fcc9 -> 45d6fb717


MINOR: fix shutdownHook in ConsoleConsumer

Author: Confluent <confluent@Confluents-MacBook-Pro.local>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #548 from guozhangwang/HFConsoleConsumer

(cherry picked from commit 36b1c1dae292b9a43f56c385de13b89dfd03cad8)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45d6fb71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45d6fb71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45d6fb71

Branch: refs/heads/0.9.0
Commit: 45d6fb717d3607f0e1a05a429f4feb38f7b50e40
Parents: fda91fc
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Nov 17 18:00:09 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Nov 17 18:00:16 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45d6fb71/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a0c00b5..0760336 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -18,6 +18,7 @@
 package kafka.tools
 
 import java.io.PrintStream
+import java.util.concurrent.CountDownLatch
 import java.util.{Properties, Random}
 import joptsimple._
 import kafka.common.StreamEndException
@@ -26,6 +27,7 @@ import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConversions._
@@ -37,6 +39,8 @@ object ConsoleConsumer extends Logging {
 
   var messageCount = 0
 
+  private val shutdownLatch = new CountDownLatch(1)
+
   def main(args: Array[String]) {
     val conf = new ConsumerConfig(args)
     try {
@@ -70,6 +74,8 @@ object ConsoleConsumer extends Logging {
       // if we generated a random group id (as none specified explicitly) then avoid polluting
zookeeper with persistent group data, this is a hack
       if (!conf.groupIdPassed)
         ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" +
conf.consumerProps.get("group.id"))
+
+      shutdownLatch.countDown()
     }
   }
 
@@ -91,6 +97,8 @@ object ConsoleConsumer extends Logging {
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
         consumer.stop()
+
+        shutdownLatch.await()
       }
     })
   }
@@ -105,6 +113,10 @@ object ConsoleConsumer extends Logging {
           trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
           // Consumer is already closed
           return
+        case nse: WakeupException =>
+          trace("Caught WakeupException because consumer is shutdown, ignore and terminate.")
+          // Consumer will be closed
+          return
         case e: Throwable =>
           error("Error processing message, terminating consumer process: ", e)
           // Consumer will be closed


Mime
View raw message