kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4360:Controller may deadLock when autoLeaderRebalance encounter zk expired
Date Wed, 09 Nov 2016 18:38:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 11766ad31 -> 6dbe202b6


KAFKA-4360:Controller may deadLock when autoLeaderRebalance encounter zk expired

Author: tuyang <tuyang@meituan.com>
Author: xiguantiaozhan <kafkausr@126.com>

Reviewers: Ismael Juma, Jiangjie Qin, Guozhang Wang

Closes #2094 from xiguantiaozhan/rebalance_deadlock


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6dbe202b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6dbe202b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6dbe202b

Branch: refs/heads/trunk
Commit: 6dbe202b6dc3ba638d3a55ff89b594bd334bb8b0
Parents: 11766ad
Author: Tu Yang <tuyang@meituan.com>
Authored: Wed Nov 9 10:37:59 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 9 10:37:59 2016 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |  3 ++-
 .../kafka/server/ZookeeperLeaderElector.scala   | 23 ++++++++++++++------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6dbe202b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b7b4d71..4a94aad 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -352,6 +352,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
   /**
    * This callback is invoked by the zookeeper leader elector when the current broker resigns
as the controller. This is
    * required to clean up internal controller data structures
+   * Note:We need to resign as a controller out of the controller lock to avoid potential
deadlock issue
    */
   def onControllerResignation() {
     debug("Controller resigning, broker id %d".format(config.brokerId))
@@ -1158,8 +1159,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
     @throws(classOf[Exception])
     def handleNewSession() {
       info("ZK expired; shut down all controller components and try to re-elect")
+      onControllerResignation()
       inLock(controllerContext.controllerLock) {
-        onControllerResignation()
         controllerElector.elect
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6dbe202b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index bb6caa0..73e7210 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -121,13 +121,16 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
-      inLock(controllerContext.controllerLock) {
+      val shouldResign = inLock(controllerContext.controllerLock) {
         val amILeaderBeforeDataChange = amILeader
         leaderId = KafkaController.parseControllerId(data.toString)
         info("New leader is %d".format(leaderId))
         // The old leader needs to resign leadership if it is no longer the leader
-        if (amILeaderBeforeDataChange && !amILeader)
-          onResigningAsLeader()
+        amILeaderBeforeDataChange && !amILeader
+      }
+
+      if (shouldResign) {
+        onResigningAsLeader()
       }
     }
 
@@ -137,12 +140,18 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      *             On any error.
      */
     @throws(classOf[Exception])
-    def handleDataDeleted(dataPath: String) {
-      inLock(controllerContext.controllerLock) {
+    def handleDataDeleted(dataPath: String) { 
+      val shouldResign = inLock(controllerContext.controllerLock) {
         debug("%s leader change listener fired for path %s to handle data deleted: trying
to elect as a leader"
           .format(brokerId, dataPath))
-        if(amILeader)
-          onResigningAsLeader()
+        amILeader
+      }
+
+      if(shouldResign) {
+        onResigningAsLeader()
+      }
+
+      inLock(controllerContext.controllerLock) {
         elect
       }
     }


Mime
View raw message