kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [1/2] git commit: KAFKA-1185 Improve leader elector module to have a resign API; reviewed by Guozhang Wang and Jun Rao
Date Fri, 03 Jan 2014 19:10:46 GMT
Updated Branches:
  refs/heads/trunk b23cf1968 -> a119f532c


KAFKA-1185 Improve leader elector module to have a resign API; reviewed by Guozhang Wang and
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10fa2000
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10fa2000
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10fa2000

Branch: refs/heads/trunk
Commit: 10fa20001dd22a2cfc7da065d75d7cf6c0009b42
Parents: b5d1687
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Fri Dec 20 14:39:03 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Dec 20 14:39:03 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 32 ++++++++++-------
 .../kafka/server/ZookeeperLeaderElector.scala   | 36 +++++++++++---------
 2 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 965d0e5..2fcc36d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -110,8 +110,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
-  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
onControllerFailover,
-    config.brokerId)
+  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
+                                                             onControllerFailover, onControllerResignation,
config.brokerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -256,6 +256,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
   }
 
   /**
+   * This callback is invoked by the zookeeper leader elector when the current broker resigns
as the controller. This is
+   * required to clean up internal controller data structures
+   */
+  def onControllerResignation() {
+    controllerContext.controllerLock synchronized {
+      Utils.unregisterMBean(KafkaController.MBeanName)
+      partitionStateMachine.shutdown()
+      replicaStateMachine.shutdown()
+      if(controllerContext.controllerChannelManager != null) {
+        controllerContext.controllerChannelManager.shutdown()
+        controllerContext.controllerChannelManager = null
+      }
+    }
+  }
+
+  /**
    * Returns true if this broker is the current controller.
    */
   def isActive(): Boolean = {
@@ -894,16 +910,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     @throws(classOf[Exception])
     def handleNewSession() {
       info("ZK expired; shut down all controller components and try to re-elect")
-      controllerContext.controllerLock synchronized {
-        Utils.unregisterMBean(KafkaController.MBeanName)
-        partitionStateMachine.shutdown()
-        replicaStateMachine.shutdown()
-        if(controllerContext.controllerChannelManager != null) {
-          controllerContext.controllerChannelManager.shutdown()
-          controllerContext.controllerChannelManager = null
-        }
-        controllerElector.elect
-      }
+      onControllerResignation()
+      controllerElector.elect
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10fa2000/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index cc6f1eb..b189619 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -30,7 +30,10 @@ import kafka.common.KafkaException
  * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes
the leader state change
  * callback
  */
-class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String,
onBecomingLeader: () => Unit,
+class ZookeeperLeaderElector(controllerContext: ControllerContext,
+                             electionPath: String,
+                             onBecomingLeader: () => Unit,
+                             onResigningAsLeader: () => Unit,
                              brokerId: Int)
   extends LeaderElector with Logging {
   var leaderId = -1
@@ -58,23 +61,22 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId
       onBecomingLeader()
-      } catch {
-        case e: ZkNodeExistsException =>
-          // If someone else has written the path, then
-          leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match
{
-            case Some(controller) => KafkaController.parseControllerId(controller)
-            case None => {
-              warn("A leader has been elected but just resigned, this will result in another
round of election")
-              -1
-            }
+    } catch {
+      case e: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+          case Some(controller) => KafkaController.parseControllerId(controller)
+          case None => {
+            warn("A leader has been elected but just resigned, this will result in another
round of election")
+            -1
           }
-          if (leaderId != -1)
-            debug("Broker %d was elected as leader instead of broker %d".format(leaderId,
brokerId))
-        case e2: Throwable =>
-          error("Error while electing or becoming leader on broker %d".format(brokerId),
e2)
-          leaderId = -1
+        }
+        if (leaderId != -1)
+          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+      case e2: Throwable =>
+        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+        resign()
     }
-
     amILeader
   }
 
@@ -116,6 +118,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       controllerContext.controllerLock synchronized {
         debug("%s leader change listener fired for path %s to handle data deleted: trying
to elect as a leader"
           .format(brokerId, dataPath))
+        if(amILeader)
+          onResigningAsLeader()
         elect
       }
     }


Mime
View raw message