kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5388; Replace ZkClient.subscribe*Changes methods with equivalent ZkUtils methods
Date Fri, 04 Aug 2017 13:19:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c06fa5ada -> c1690da07


KAFKA-5388; Replace ZkClient.subscribe*Changes methods with equivalent ZkUtils methods

Author: Balint Molnar <balintmolnar91@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3281 from baluchicken/KAFKA-5388


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1690da0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1690da0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1690da0

Branch: refs/heads/trunk
Commit: c1690da071c00ef8d6cca7b3bed50a0e5e2c3332
Parents: c06fa5a
Author: Balint Molnar <balintmolnar91@gmail.com>
Authored: Fri Aug 4 13:35:35 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Aug 4 13:46:08 2017 +0100

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |  6 ++--
 .../consumer/ZookeeperConsumerConnector.scala   |  8 ++---
 .../consumer/ZookeeperTopicEventWatcher.scala   | 14 ++++----
 .../kafka/controller/KafkaController.scala      | 38 ++++++++++----------
 .../scala/kafka/server/KafkaHealthcheck.scala   |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 20 ++++++++++-
 7 files changed, 54 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 450707b..99b5103 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -40,7 +40,7 @@ trait NotificationHandler {
  * notificationHandler's processNotification() method with the child's data as argument.
As part of processing these changes it also
  * purges any children with currentTime - createTime > changeExpirationMs.
  *
- * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges
and call processAllNotifications
+ * The caller/user of this class should ensure that they use zkUtils.subscribeStateChanges
and call processAllNotifications
  * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary
to ensure that if zk session
  * is terminated and reestablished any missed notification will be processed immediately.
  * @param zkUtils
@@ -64,8 +64,8 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
    */
   def init() {
     zkUtils.makeSurePersistentPathExists(seqNodeRoot)
-    zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
-    zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
+    zkUtils.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+    zkUtils.subscribeStateChanges(ZkStateChangeListener)
     processAllNotifications()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index cdf730f..d1928b4 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -679,7 +679,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         // We log a warning and register for child changes on brokers/id so that rebalance
can be triggered when the brokers
         // are up.
         warn("no brokers found when trying to rebalance.")
-        zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
+        zkUtils.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
         true
       }
       else {
@@ -954,14 +954,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     })
 
     // listener to consumer and partition changes
-    zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
+    zkUtils.subscribeStateChanges(sessionExpirationListener)
 
-    zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+    zkUtils.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
     topicStreamsMap.foreach { topicAndStreams =>
       // register on broker partition path changes
       val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
+      zkUtils.subscribeDataChanges(topicPath, topicPartitionChangeListener)
     }
 
     // explicitly trigger load balancing for this consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 7de1980..8ce204e 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -34,17 +34,17 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
     val topicEventListener = new ZkTopicEventListener()
     zkUtils.makeSurePersistentPathExists(ZkUtils.BrokerTopicsPath)
 
-    zkUtils.zkClient.subscribeStateChanges(
-      new ZkSessionExpireListener(topicEventListener))
+    zkUtils.subscribeStateChanges(new ZkSessionExpireListener(topicEventListener))
 
-    val topics = zkUtils.zkClient.subscribeChildChanges(
-      ZkUtils.BrokerTopicsPath, topicEventListener)
+    val topics = zkUtils.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener).getOrElse
{
+      throw new AssertionError(s"Expected ${ZkUtils.BrokerTopicsPath} to exist, but it does
not. ")
+    }
 
     // call to bootstrap topic list
-    topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
+    topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics.asJava)
   }
 
-  private def stopWatchingTopicEvents() { zkUtils.zkClient.unsubscribeAll() }
+  private def stopWatchingTopicEvents() { zkUtils.unsubscribeAll() }
 
   def shutdown() {
     lock.synchronized {
@@ -90,7 +90,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
       lock.synchronized {
         if (zkUtils != null) {
           info("ZK expired: resubscribing topic event listener to topic registry")
-          zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
+          zkUtils.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1de04d8..780ae52 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -592,7 +592,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
       reassignedReplicas.toSet)
     reassignedPartitionContext.isrChangeListener = isrChangeListener
     // register listener on the leader and isr path to wait until they catch up with the
current leader
-    zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition),
isrChangeListener)
+    zkUtils.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
   }
 
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -694,11 +694,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
   }
 
   private def registerSessionExpirationListener() = {
-    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
+    zkUtils.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
   }
 
   private def registerControllerChangeListener() = {
-    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this,
eventManager))
+    zkUtils.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this,
eventManager))
   }
 
   private def initializeControllerContext() {
@@ -894,70 +894,70 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
   }
 
   private def registerBrokerChangeListener() = {
-    zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+    zkUtils.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
   }
 
   private def deregisterBrokerChangeListener() = {
-    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+    zkUtils.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
   }
 
   private def registerTopicChangeListener() = {
-    zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
+    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
   }
 
   private def deregisterTopicChangeListener() = {
-    zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
+    zkUtils.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
   }
 
   def registerPartitionModificationsListener(topic: String) = {
     partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager,
topic))
-    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+    zkUtils.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
   }
 
   def deregisterPartitionModificationsListener(topic: String) = {
-    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+    zkUtils.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
     partitionModificationsListeners.remove(topic)
   }
 
   private def registerTopicDeletionListener() = {
-    zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
+    zkUtils.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
   }
 
   private def deregisterTopicDeletionListener() = {
-    zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
+    zkUtils.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
   }
 
   private def registerPartitionReassignmentListener() = {
-    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
+    zkUtils.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
   }
 
   private def deregisterPartitionReassignmentListener() = {
-    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
+    zkUtils.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
   }
 
   private def registerIsrChangeNotificationListener() = {
     debug("Registering IsrChangeNotificationListener")
-    zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+    zkUtils.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
   private def deregisterIsrChangeNotificationListener() = {
     debug("De-registering IsrChangeNotificationListener")
-    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+    zkUtils.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
   private def registerPreferredReplicaElectionListener() {
-    zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+    zkUtils.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
 
   private def deregisterPreferredReplicaElectionListener() {
-    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+    zkUtils.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
 
   private def deregisterPartitionReassignmentIsrChangeListeners() {
     controllerContext.partitionsBeingReassigned.foreach {
       case (topicAndPartition, reassignedPartitionsContext) =>
         val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic,
topicAndPartition.partition)
-        zkUtils.zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+        zkUtils.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
     }
   }
 
@@ -984,7 +984,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
       // stop watching the ISR changes for this partition
-      zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic,
topicAndPartition.partition),
+      zkUtils.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic,
topicAndPartition.partition),
         controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
     }
     // read the current list of reassigned partitions from zookeeper

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 0b62fca..b108bf6 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -46,7 +46,7 @@ class KafkaHealthcheck(brokerId: Int,
   private[server] val sessionExpireListener = new SessionExpireListener
 
   def startup() {
-    zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
+    zkUtils.subscribeStateChanges(sessionExpireListener)
     register()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a167e36..1270e2f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -336,7 +336,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                                               secureAclsEnabled)
       zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
       info(s"Created zookeeper path $chroot")
-      zkClientForChrootCreation.zkClient.close()
+      zkClientForChrootCreation.close()
     }
 
     val zkUtils = ZkUtils(config.zkConnect,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1690da0/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 6953507..cb20b31 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -29,7 +29,7 @@ import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError,
ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, IZkChildListener, IZkStateListener}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
@@ -614,6 +614,24 @@ class ZkUtils(val zkClient: ZkClient,
     zkClient.deleteRecursive(path)
   }
 
+  def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
+    zkClient.subscribeDataChanges(path, listener)
+
+  def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
+    zkClient.unsubscribeDataChanges(path, dataListener)
+
+  def subscribeStateChanges(listener: IZkStateListener): Unit =
+    zkClient.subscribeStateChanges(listener)
+
+  def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]]
=
+    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
+
+  def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
+    zkClient.unsubscribeChildChanges(path, childListener)
+
+  def unsubscribeAll(): Unit =
+    zkClient.unsubscribeAll()
+
   def readData(path: String): (String, Stat) = {
     val stat: Stat = new Stat()
     val dataStr: String = zkClient.readData(path, stat)


Mime
View raw message