kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification event
Date Thu, 05 Oct 2017 00:37:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 20d9adb17 -> b5b266eee


KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification
event

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #3840 from lindong28/KAFKA-5879


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5b266ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5b266ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5b266ee

Branch: refs/heads/trunk
Commit: b5b266eeeccc18c5cfbd37fa59ab5d9cf3676e34
Parents: 20d9adb
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Oct 4 17:37:35 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 4 17:37:35 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/KafkaController.scala    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5b266ee/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4ea61ab..811ff67 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -993,7 +993,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
-    zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
+      zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
     // update the cache. NO-OP if the partition's reassignment was never started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
@@ -1366,16 +1367,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     def state = ControllerState.IsrChange
 
     override def process(): Unit = {
+      // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers
+      // to increase the odds of processing recent isr changes in a single ControllerEvent
+      // and to reduce the odds of trying to access znodes that have already been deleted
(KAFKA-5879).
+      val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
       if (!isActive) return
       try {
-        val topicAndPartitions = sequenceNumbers.flatMap(getTopicAndPartition).toSet
+        val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
         if (topicAndPartitions.nonEmpty) {
           updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
         }
       } finally {
         // delete the notifications
-        sequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath
+ "/" + x))
+        currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath
+ "/" + x))
       }
     }
 


Mime
View raw message