kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer; patched by Sam Meder; reviewed by Jun Rao
Date Fri, 16 Aug 2013 17:09:16 GMT
Updated Branches:
  refs/heads/0.8 1db824ed2 -> 7a9faa49e


kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer;
patched by Sam Meder; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a9faa49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a9faa49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a9faa49

Branch: refs/heads/0.8
Commit: 7a9faa49ed5c7581cb2bd6c86b68df06a8879fec
Parents: 1db824e
Author: Sam Meder <sam.meder@gmail.com>
Authored: Fri Aug 16 10:13:30 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Aug 16 10:13:30 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala    | 3 ++-
 core/src/main/scala/kafka/tools/ImportZkOffsets.scala             | 3 ---
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..c2b9b9a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           for (i <- 0 until config.rebalanceMaxRetries) {
             info("begin rebalancing consumer " + consumerIdString + " try #" + i)
             var done = false
-            val cluster = getCluster(zkClient)
+            var cluster: Cluster = null
             try {
+              cluster = getCluster(zkClient)
               done = rebalance(cluster)
             } catch {
               case e =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 63519e1..55709b5 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging {
   }
   
   private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]):
Unit = {
-    val cluster = ZkUtils.getCluster(zkClient)
-    var partitions: List[String] = Nil
-
     for ((partition, offset) <- partitionOffsets) {
       debug("updating [" + partition + "] with offset [" + offset + "]")
       


Mime
View raw message