kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-916; Break deadlock between fetcher shutdown and error partition handling; reviewed by Jun Rao.
Date Tue, 28 May 2013 16:55:48 GMT
Updated Branches:
  refs/heads/0.8 59599cc12 -> 517af4779


KAFKA-916; Break deadlock between fetcher shutdown and error partition handling; reviewed
by Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517af477
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517af477
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517af477

Branch: refs/heads/0.8
Commit: 517af4779aed73efa9889b89d99a9c789f091b06
Parents: 59599cc
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Tue May 28 09:55:39 2013 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue May 28 09:55:39 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala    |    1 -
 .../kafka/consumer/ConsumerFetcherThread.scala     |    1 +
 2 files changed, 1 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index db104f1..96bd886 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -167,7 +167,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     lock.lock()
     try {
       if (partitionMap != null) {
-        partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition))
         noLeaderPartitionSet ++= partitionList
         cond.signalAll()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 1270e92..dda0a8f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -66,6 +66,7 @@ class ConsumerFetcherThread(name: String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
+    partitions.foreach(tap => removePartition(tap.topic, tap.partition))
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
 }


Mime
View raw message