kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [32/37] git commit: kafka-1451; Broker stuck due to leader election race; patched by Manikumar Reddy; reviewed by Jun Rao
Date Tue, 05 Aug 2014 23:00:29 GMT
kafka-1451; Broker stuck due to leader election race; patched by Manikumar Reddy; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a01a101e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a01a101e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a01a101e

Branch: refs/heads/transactional_messaging
Commit: a01a101e82d5b06e89857e79c4b8268589d81fca
Parents: 50f2b24
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Wed Jul 30 08:14:41 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jul 30 08:14:41 2014 -0700

----------------------------------------------------------------------
 .../kafka/server/ZookeeperLeaderElector.scala   | 30 +++++++++++++++-----
 1 file changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a01a101e/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index e5b6ff1..a75818a 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -50,9 +50,27 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     }
   }
 
+  private def getControllerID(): Int = {
+    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+       case Some(controller) => KafkaController.parseControllerId(controller)
+       case None => -1
+    }
+  }
+    
   def elect: Boolean = {
     val timestamp = SystemTime.milliseconds.toString
     val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp"
-> timestamp))
+   
+   leaderId = getControllerID 
+    /* 
+     * We can get here during the initial startup and the handleDeleted ZK callback. Because
of the potential race condition, 
+     * it's possible that the controller has already been elected when we get here. This
check will prevent the following 
+     * createEphemeralPath method from getting into an infinite loop if this broker is already
the controller.
+     */
+    if(leaderId != -1) {
+       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
+       return amILeader
+    }
 
     try {
       createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath,
electString, brokerId,
@@ -64,15 +82,13 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     } catch {
       case e: ZkNodeExistsException =>
         // If someone else has written the path, then
-        leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-          case Some(controller) => KafkaController.parseControllerId(controller)
-          case None => {
-            warn("A leader has been elected but just resigned, this will result in another
round of election")
-            -1
-          }
-        }
+        leaderId = getControllerID 
+
         if (leaderId != -1)
           debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+        else
+          warn("A leader has been elected but just resigned, this will result in another
round of election")
+
       case e2: Throwable =>
         error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
         resign()


Mime
View raw message