KAFKA-1075; Consumer will not rebalance upon topic partition change; reviewed by Neha Narkhede
and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cb21757
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cb21757
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cb21757
Branch: refs/heads/trunk
Commit: 1cb217579bbb1f9d4df24a3d32bf88644697b7fe
Parents: 40efe0a
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Mon Oct 7 14:05:40 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 7 14:05:40 2013 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 38 ++++++++++++++++----
1 file changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cb21757/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 08b4b72..36b167b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -25,7 +25,7 @@ import kafka.cluster._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.UUID
import kafka.serializer._
@@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
+ private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
@@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
-
-
class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
@@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
+ class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
+ extends IZkDataListener {
+
+ def handleDataChange(dataPath : String, data: Object) {
+ try {
+ info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering
rebalance")
+ // explicitly trigger load balancing for this consumer
+ loadBalancerListener.syncedRebalance()
+
+ // There is no need to re-subscribe the watcher since it will be automatically
+ // re-registered upon firing of this event by zkClient
+ } catch {
+ case e: Throwable => error("Error while handling topic partition change for data
path " + dataPath, e )
+ }
+ }
+
+ @throws(classOf[Exception])
+ def handleDataDeleted(dataPath : String) {
+ // TODO: This need to be implemented when we support delete topic
+ warn("Topic for path " + dataPath + " gets deleted, which should not happen at this
time")
+ }
+ }
+
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
@@ -626,11 +648,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String,
List[KafkaStream[_,_]]]])
}
- // register listener for session expired event
+ // create listener for session expired event if not exist yet
if (sessionExpirationListener == null)
sessionExpirationListener = new ZKSessionExpireListener(
dirs, consumerIdString, topicCount, loadBalancerListener)
+ // create listener for topic partition change event if not exist yet
+ if (topicPartitionChangeListenner == null)
+ topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener)
+
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
// map of {topic -> Set(thread-1, thread-2, ...)}
@@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
- val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
- zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+ val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
+ zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner)
}
// explicitly trigger load balancing for this consumer
|