kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5310; reset ControllerContext during resignation
Date Tue, 23 May 2017 23:30:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1bf648331 -> beeddc25d


KAFKA-5310; reset ControllerContext during resignation

This ticket is all about ControllerContext initialization and teardown. The key points are:
1. we should teardown ControllerContext during resignation instead of waiting on election
to fix it up. A heapdump shows that the former controller keeps pretty much all of its ControllerContext
state laying around.
2. we don't properly teardown/reset ControllerContext.partitionsBeingReassigned. This can
cause problems when the former controller becomes re-elected as controller at a later point
in time.

Suppose a partition assignment is initially R0. Now suppose a reassignment R1 gets stuck during
controller C0 and an admin tries to "undo" R1 (by deleting /admin/partitions_reassigned, deleting
/controller, and submitting another reassignment specifying R0). The new controller C1 may
succeed with R0. If the controller moves back to C0, it will then reattempt R1 even though
that partition reassignment has been cleared from zookeeper prior to shifting the controller
back to C0. This results in the actual partition reassignment in zookeeper being unexpectedly
changed back to R1.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3122 from onurkaraman/KAFKA-5310


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/beeddc25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/beeddc25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/beeddc25

Branch: refs/heads/trunk
Commit: beeddc25d6b443ed344658a2562a162fb03048ef
Parents: 1bf6483
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Tue May 23 16:29:55 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 23 16:29:55 2017 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 23 ++++++++++++++------
 1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/beeddc25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 41a88d9..69669cd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -315,14 +315,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     replicaStateMachine.shutdown()
     deregisterBrokerChangeListener()
 
-    // shutdown controller channel manager
-    if(controllerContext.controllerChannelManager != null) {
-      controllerContext.controllerChannelManager.shutdown()
-      controllerContext.controllerChannelManager = null
-    }
     // reset controller context
-    controllerContext.epoch=0
-    controllerContext.epochZkVersion=0
+    resetControllerContext()
     brokerState.newState(RunningAsBroker)
 
     info("Broker %d resigned as the controller".format(config.brokerId))
@@ -690,6 +684,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
     pendingPreferredReplicaElections
   }
 
+  private def resetControllerContext(): Unit = {
+    if(controllerContext.controllerChannelManager != null) {
+      controllerContext.controllerChannelManager.shutdown()
+      controllerContext.controllerChannelManager = null
+    }
+    controllerContext.shuttingDownBrokerIds.clear()
+    controllerContext.epoch = 0
+    controllerContext.epochZkVersion = 0
+    controllerContext.allTopics = Set.empty
+    controllerContext.partitionReplicaAssignment.clear()
+    controllerContext.partitionLeadershipInfo.clear()
+    controllerContext.partitionsBeingReassigned.clear()
+    controllerContext.liveBrokers = Set.empty
+  }
+
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()


Mime
View raw message