Updated Branches:
refs/heads/0.8 1db824ed2 -> 7a9faa49e
kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer;
patched by Sam Meder; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a9faa49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a9faa49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a9faa49
Branch: refs/heads/0.8
Commit: 7a9faa49ed5c7581cb2bd6c86b68df06a8879fec
Parents: 1db824e
Author: Sam Meder <sam.meder@gmail.com>
Authored: Fri Aug 16 10:13:30 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Aug 16 10:13:30 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 ++-
core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 3 ---
2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..c2b9b9a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- 0 until config.rebalanceMaxRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
- val cluster = getCluster(zkClient)
+ var cluster: Cluster = null
try {
+ cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
case e =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 63519e1..55709b5 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging {
}
private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]):
Unit = {
- val cluster = ZkUtils.getCluster(zkClient)
- var partitions: List[String] = Nil
-
for ((partition, offset) <- partitionOffsets) {
debug("updating [" + partition + "] with offset [" + offset + "]")
|