kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2437; Fix ZookeeperLeaderElector to handle node deletion correctly.
Date Thu, 03 Sep 2015 01:13:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 689d170ac -> 6f398d63a


KAFKA-2437; Fix ZookeeperLeaderElector to handle node deletion correctly.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>

Closes #189 from becketqin/KAFKA-2437


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f398d63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f398d63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f398d63

Branch: refs/heads/trunk
Commit: 6f398d63a4b9a23e8a3dfa436b5522e505dd5b47
Parents: 689d170
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Sep 2 18:11:58 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Sep 2 18:12:42 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f398d63/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index a5c5fb3..9bfec40 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -119,8 +119,12 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       inLock(controllerContext.controllerLock) {
+        val amILeaderBeforeDataChange = amILeader
         leaderId = KafkaController.parseControllerId(data.toString)
         info("New leader is %d".format(leaderId))
+        // The old leader need to resign leadership if it is no longer the leader
+        if (amILeaderBeforeDataChange && !amILeader)
+          onResigningAsLeader
       }
     }
 


Mime
View raw message