kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1096 An old controller coming out of long GC could update its epoch to the latest controller's epoch; reviewed by Neha Narkhede
Date Thu, 26 Jun 2014 23:16:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6b0ae4bba -> 62f208704


KAFKA-1096 An old controller coming out of long GC could update its epoch to the latest controller's
epoch; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62f20870
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62f20870
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62f20870

Branch: refs/heads/trunk
Commit: 62f208704fe7fb08f085d569a26e780e0050dba0
Parents: 6b0ae4b
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Thu Jun 26 16:16:03 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Jun 26 16:16:15 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 53 +++++---------------
 1 file changed, 12 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62f20870/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 94bbd33..a7a21df 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -169,8 +169,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
-  registerControllerChangedListener()
-
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
@@ -298,6 +296,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
   def onControllerFailover() {
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
+      //read controller epoch from zk
+      readControllerEpochFromZookeeper()
       // increment the controller epoch
       incrementControllerEpoch(zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic
callbacks
@@ -346,6 +346,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
       }
+      controllerContext.epoch=0
+      controllerContext.epochZkVersion=0
       brokerState.newState(RunningAsBroker)
     }
   }
@@ -875,8 +877,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
     zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
   }
 
-  private def registerControllerChangedListener() {
-    zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
+  private def readControllerEpochFromZookeeper() {
+    // initialize the controller epoch and zk version by reading from zookeeper
+    if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
+      val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
+      controllerContext.epoch = epochData._1.toInt
+      controllerContext.epochZkVersion = epochData._2.getVersion
+      info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch,
controllerContext.epochZkVersion))
+    }
   }
 
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
@@ -1275,43 +1283,6 @@ class PreferredReplicaElectionListener(controller: KafkaController)
extends IZkD
   }
 }
 
-class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging
{
-  this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: "
-  val controllerContext = controller.controllerContext
-  readControllerEpochFromZookeeper()
-
-  /**
-   * Invoked when a controller updates the epoch value
-   * @throws Exception On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
-    debug("Controller epoch listener fired with new epoch " + data.toString)
-    inLock(controllerContext.controllerLock) {
-      // read the epoch path to get the zk version
-      readControllerEpochFromZookeeper()
-    }
-  }
-
-  /**
-   * @throws Exception
-   *             On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
-
-  private def readControllerEpochFromZookeeper() {
-    // initialize the controller epoch and zk version by reading from zookeeper
-    if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
-      val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
-      controllerContext.epoch = epochData._1.toInt
-      controllerContext.epochZkVersion = epochData._2.getVersion
-      info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch,
controllerContext.epochZkVersion))
-    }
-  }
-}
-
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                        var isrChangeListener: ReassignedPartitionsIsrChangeListener
= null)
 


Mime
View raw message