kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2300: follow-up to clear the controller state upon resignment
Date Mon, 21 Sep 2015 18:56:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9dbeb71ab -> 9ae98685e


KAFKA-2300: follow-up to clear the controller state upon resignment

I have reopened this issue because the controller isn't cleaning up the state upon an exception
and the test case was legitimately failing for me every now and then. I'm proposing a change
to fix this.

Author: fpj <fpj@apache.org>
Author: flavio junqueira <fpj@apache.org>

Reviewers: Ismael Juma, Gwen Shapira, Guozhang Wang

Closes #212 from fpj/2300


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ae98685
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ae98685
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ae98685

Branch: refs/heads/trunk
Commit: 9ae98685ec7882598aee80d11af41d7c0828e1b9
Parents: 9dbeb71
Author: Flavio Junqueira <fpj@apache.org>
Authored: Mon Sep 21 11:59:59 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Sep 21 11:59:59 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 ++++++
 core/src/main/scala/kafka/controller/KafkaController.scala     | 3 +++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ae98685/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b1cf668..03234dc 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -254,6 +254,12 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
         "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
   }
 
+  def clear() {
+    leaderAndIsrRequestMap.clear()
+    stopReplicaRequestMap.clear()
+    updateMetadataRequestMap.clear()
+  }
+
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                        replicas: Seq[Int], callback: AbstractRequestResponse
=> Unit = null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ae98685/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 284fa23..29448b1 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -276,6 +276,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
                     case e : IllegalStateException => {
                       // Resign if the controller is in an illegal state
                       error("Forcing the controller to resign")
+                      brokerRequestBatch.clear()
                       controllerElector.resign()
 
                       throw e
@@ -910,6 +911,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
           case e : IllegalStateException => {
             // Resign if the controller is in an illegal state
             error("Forcing the controller to resign")
+            brokerRequestBatch.clear()
             controllerElector.resign()
 
             throw e
@@ -1029,6 +1031,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
       case e : IllegalStateException => {
         // Resign if the controller is in an illegal state
         error("Forcing the controller to resign")
+        brokerRequestBatch.clear()
         controllerElector.resign()
 
         throw e


Mime
View raw message