kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification event
Date Thu, 05 Oct 2017 00:37:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0.0 0222a35db -> 5dab884d7


KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification
event

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #3840 from lindong28/KAFKA-5879

(cherry picked from commit b5b266eeeccc18c5cfbd37fa59ab5d9cf3676e34)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dab884d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dab884d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dab884d

Branch: refs/heads/1.0.0
Commit: 5dab884d74183ef0409c8efab465d4223827a604
Parents: 0222a35
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Oct 4 17:37:35 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 4 17:37:46 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/KafkaController.scala    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5dab884d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4ea61ab..811ff67 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -993,7 +993,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
-    zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
+      zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
     // update the cache. NO-OP if the partition's reassignment was never started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
@@ -1366,16 +1367,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time:
Time, met
     def state = ControllerState.IsrChange
 
     override def process(): Unit = {
+      // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers
+      // to increase the odds of processing recent isr changes in a single ControllerEvent
+      // and to reduce the odds of trying to access znodes that have already been deleted
(KAFKA-5879).
+      val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
       if (!isActive) return
       try {
-        val topicAndPartitions = sequenceNumbers.flatMap(getTopicAndPartition).toSet
+        val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
         if (topicAndPartitions.nonEmpty) {
           updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
         }
       } finally {
         // delete the notifications
-        sequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath
+ "/" + x))
+        currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath
+ "/" + x))
       }
     }
 


Mime
View raw message