kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy
Date Wed, 04 Sep 2013 03:51:04 GMT
Updated Branches:
  refs/heads/0.8 f89ddced1 -> 20953b525


kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by
Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20953b52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20953b52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20953b52

Branch: refs/heads/0.8
Commit: 20953b52558935ba210eaee18e9504bf16bfec27
Parents: f89ddce
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Sep 3 20:50:45 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Sep 3 20:50:45 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala      | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/20953b52/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b286312..fa6b213 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -92,7 +92,20 @@ class ConsumerFetcherManager(private val consumerIdString: String,
       leaderForPartitionsMap.foreach {
         case(topicAndPartition, leaderBroker) =>
           val pti = partitionMap(topicAndPartition)
-          addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
+          try {
+            addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
+          } catch {
+            case t => {
+                if (!isRunning.get())
+                  throw t /* If this thread is stopped, propagate this exception to kill
the thread. */
+                else {
+                  warn("Failed to add leader for partition %s; will retry".format(topicAndPartition),
t)
+                  lock.lock()
+                  noLeaderPartitionSet += topicAndPartition
+                  lock.unlock()
+                }
+              }
+          }
       }
 
       shutdownIdleFetcherThreads()


Mime
View raw message