kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [13/14] git commit: KAFKA-1075; Consumer will not rebalance upon topic partition change; reviewed by Neha Narkhede and Jun Rao
Date Mon, 07 Oct 2013 22:05:34 GMT
KAFKA-1075; Consumer will not rebalance upon topic partition change; reviewed by Neha Narkhede
and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cb21757
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cb21757
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cb21757

Branch: refs/heads/trunk
Commit: 1cb217579bbb1f9d4df24a3d32bf88644697b7fe
Parents: 40efe0a
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Mon Oct 7 14:05:40 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 7 14:05:40 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 38 ++++++++++++++++----
 1 file changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cb21757/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 08b4b72..36b167b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -25,7 +25,7 @@ import kafka.cluster._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import java.net.InetAddress
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.util.UUID
 import kafka.serializer._
@@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val messageStreamCreated = new AtomicBoolean(false)
 
   private var sessionExpirationListener: ZKSessionExpireListener = null
+  private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null
   private var loadBalancerListener: ZKRebalancerListener = null
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
@@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-
-
   class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
@@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   }
 
+  class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
+    extends IZkDataListener {
+
+    def handleDataChange(dataPath : String, data: Object) {
+      try {
+        info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering
rebalance")
+        // explicitly trigger load balancing for this consumer
+        loadBalancerListener.syncedRebalance()
+
+        // There is no need to re-subscribe the watcher since it will be automatically
+        // re-registered upon firing of this event by zkClient
+      } catch {
+        case e: Throwable => error("Error while handling topic partition change for data
path " + dataPath, e )
+      }
+    }
+
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath : String) {
+      // TODO: This need to be implemented when we support delete topic
+      warn("Topic for path " + dataPath + " gets deleted, which should not happen at this
time")
+    }
+  }
+
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
@@ -626,11 +648,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String,
List[KafkaStream[_,_]]]])
     }
 
-    // register listener for session expired event
+    // create listener for session expired event if not exist yet
     if (sessionExpirationListener == null)
       sessionExpirationListener = new ZKSessionExpireListener(
         dirs, consumerIdString, topicCount, loadBalancerListener)
 
+    // create listener for topic partition change event if not exist yet
+    if (topicPartitionChangeListenner == null)
+      topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener)
+
     val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
 
     // map of {topic -> Set(thread-1, thread-2, ...)}
@@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     topicStreamsMap.foreach { topicAndStreams =>
       // register on broker partition path changes
-      val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+      val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
+      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner)
     }
 
     // explicitly trigger load balancing for this consumer


Mime
View raw message