kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1245295 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Date Fri, 17 Feb 2012 02:14:25 GMT
Author: junrao
Date: Fri Feb 17 02:14:25 2012
New Revision: 1245295

URL: http://svn.apache.org/viewvc?rev=1245295&view=rev
Log:
shutdown watch executor thread properly; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-265

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1245295&r1=1245294&r2=1245295&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Fri Feb 17 02:14:25 2012
@@ -379,17 +379,20 @@ private[kafka] class ZookeeperConsumerCo
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor")
{
       override def run() {
         info("starting watcher executor thread for consumer " + consumerIdString)
+        var doRebalance = false
         while (!isShuttingDown.get) {
           try {
             lock.lock()
             try {
               if (!isWatcherTriggered)
-                cond.await()
+                cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it
can check the shutdown flag
             } finally {
+              doRebalance = isWatcherTriggered
               isWatcherTriggered = false
               lock.unlock()
             }
-            syncedRebalance
+            if (doRebalance)
+              syncedRebalance
           } catch {
             case t => error("error during syncedRebalance", t)
           }



Mime
View raw message