kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1411010 [1/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/controller/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/admin/ ...
Date Sun, 18 Nov 2012 22:48:25 GMT
Author: nehanarkhede
Date: Sun Nov 18 22:48:20 2012
New Revision: 1411010

URL: http://svn.apache.org/viewvc?rev=1411010&view=rev
Log:
KAFKA-532 Multiple controllers can co-exist during soft failures; patched by Neha Narkhede; reviewed by Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ControllerMovedException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Sun Nov 18 22:48:20 2012
@@ -21,9 +21,8 @@ package kafka.api
 import java.nio._
 import kafka.utils._
 import kafka.api.ApiUtils._
-import collection.mutable.Map
-import collection.mutable.HashMap
 import kafka.cluster.Broker
+import kafka.controller.LeaderIsrAndControllerEpoch
 
 
 object LeaderAndIsr {
@@ -35,7 +34,7 @@ case class LeaderAndIsr(var leader: Int,
   def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
   override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
+    val jsonDataMap = new collection.mutable.HashMap[String, String]
     jsonDataMap.put("leader", leader.toString)
     jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
     jsonDataMap.put("ISR", isr.mkString(","))
@@ -43,35 +42,42 @@ case class LeaderAndIsr(var leader: Int,
   }
 }
 
-
 object PartitionStateInfo {
   def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
+    val controllerEpoch = buffer.getInt
     val leader = buffer.getInt
-    val leaderGenId = buffer.getInt
+    val leaderEpoch = buffer.getInt
     val isrString = readShortString(buffer)
     val isr = isrString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
+    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),
+      replicationFactor)
   }
 }
 
-case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) {
+case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, val replicationFactor: Int) {
   def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(leaderAndIsr.leader)
-    buffer.putInt(leaderAndIsr.leaderEpoch)
-    writeShortString(buffer, leaderAndIsr.isr.mkString(","))
-    buffer.putInt(leaderAndIsr.zkVersion)
+    buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
+    writeShortString(buffer, leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(","))
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
     buffer.putInt(replicationFactor)
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
+    val size =
+      4 /* epoch of the controller that elected the leader */ +
+      4 /* leader broker id */ +
+      4 /* leader epoch */ +
+      (2 + leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",").length) +
+      4 /* zk version */ +
+      4 /* replication factor */
     size
   }
 }
 
-
 object LeaderAndIsrRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
@@ -83,8 +89,9 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
-    val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
+    val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
 
     for(i <- 0 until partitionStateInfosCount){
       val topic = readShortString(buffer)
@@ -99,26 +106,28 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
   }
 }
 
-
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker])
+                                leaders: Set[Broker],
+                                controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+      partitionStateInfos, liveBrokers, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
       writeShortString(buffer, key._1)
@@ -130,12 +139,17 @@ case class LeaderAndIsrRequest (versionI
   }
 
   def sizeInBytes(): Int = {
-    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    var size =
+      2 /* version id */ +
+      (2 + clientId.length) /* client id */ +
+      4 /* ack timeout */ +
+      4 /* controller epoch */ +
+      4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
-      size += (2 + key._1.length) + 4 + value.sizeInBytes
-    size += 4
+      size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
+    size += 4 /* number of leader brokers */
     for(broker <- leaders)
-      size += broker.sizeInBytes
+      size += broker.sizeInBytes /* broker info */
     size
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala Sun Nov 18 22:48:20 2012
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import kafka.common.ErrorMapping
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import collection.mutable.HashMap
@@ -26,6 +27,7 @@ import collection.Map
 object LeaderAndIsrResponse {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
     val responseMap = new HashMap[(String, Int), Short]()
     for (i<- 0 until numEntries){
@@ -34,24 +36,32 @@ object LeaderAndIsrResponse {
       val partitionErrorCode = buffer.getShort
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new LeaderAndIsrResponse(versionId, responseMap)
+    new LeaderAndIsrResponse(versionId, responseMap, errorCode)
   }
 }
 
 
 case class LeaderAndIsrResponse(versionId: Short,
-                                responseMap: Map[(String, Int), Short])
+                                responseMap: Map[(String, Int), Short],
+                                errorCode: Short = ErrorMapping.NoError)
         extends RequestOrResponse {
   def sizeInBytes(): Int ={
-    var size =  2 + 4
-    for ((key, value) <- responseMap){
-      size += 2 + key._1.length + 4 + 2
+    var size =
+      2 /* version id */ +
+      2 /* error code */ +
+      4 /* number of responses */
+    for ((key, value) <- responseMap) {
+      size +=
+        2 + key._1.length /* topic */ +
+        4 /* partition */ +
+        2 /* error code for this partition */
     }
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Sun Nov 18 22:48:20 2012
@@ -33,6 +33,7 @@ object StopReplicaRequest extends Loggin
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
       case 1 => true
       case 0 => false
@@ -44,7 +45,7 @@ object StopReplicaRequest extends Loggin
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
+    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
   }
 }
 
@@ -52,18 +53,20 @@ case class StopReplicaRequest(versionId:
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
-                              partitions: Set[(String, Int)])
+                              partitions: Set[(String, Int)],
+                              controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-         deletePartitions, partitions)
+         deletePartitions, partitions, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
@@ -77,6 +80,7 @@ case class StopReplicaRequest(versionId:
       2 + /* versionId */
       ApiUtils.shortStringLength(clientId) +
       4 + /* ackTimeoutMs */
+      4 + /* controller epoch */
       1 + /* deletePartitions */
       4 /* partition count */
     for ((topic, partitionId) <- partitions){

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Sun Nov 18 22:48:20 2012
@@ -19,13 +19,15 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import collection.mutable.HashMap
-import collection.Map
+import collection.immutable.Map
+import kafka.common.ErrorMapping
 import kafka.api.ApiUtils._
 
 
 object StopReplicaResponse {
   def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
     val versionId = buffer.getShort
+    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
 
     val responseMap = new HashMap[(String, Int), Short]()
@@ -35,23 +37,31 @@ object StopReplicaResponse {
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new StopReplicaResponse(versionId, responseMap)
+    new StopReplicaResponse(versionId, responseMap.toMap, errorCode)
   }
 }
 
 
 case class StopReplicaResponse(val versionId: Short,
-                               val responseMap: Map[(String, Int), Short]) extends RequestOrResponse{
+                               val responseMap: Map[(String, Int), Short],
+                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
   def sizeInBytes(): Int ={
-    var size = 2 + 4
-    for ((key, value) <- responseMap){
-      size += (2 + key._1.length) + 4 + 2
+    var size =
+      2 /* version id */ +
+      2 /* error code */ +
+      4 /* number of responses */
+    for ((key, value) <- responseMap) {
+      size +=
+        2 + key._1.length /* topic */ +
+        4 /* partition */ +
+        2 /* error code for this partition */
     }
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sun Nov 18 22:48:20 2012
@@ -24,6 +24,7 @@ import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.ErrorMapping
+import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 
 
 /**
@@ -44,6 +45,12 @@ class Partition(val topic: String,
   private val leaderIsrUpdateLock = new Object
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
+   * One way of doing that is through the controller's start replica state change command. When a new broker starts up
+   * the controller sends it a start replica command containing the leader for each partition that the broker hosts.
+   * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
+   * each partition. */
+  private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -117,14 +124,18 @@ class Partition(val topic: String,
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = {
+  def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
     leaderIsrUpdateLock synchronized {
+      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
         info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
       trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
+      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path
+      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       // stop replica fetcher thread, if any
       replicaFetcherManager.removeFetcher(topic, partitionId)
 
@@ -148,14 +159,19 @@ class Partition(val topic: String,
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = {
+  def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                   liveBrokers: Set[Broker]): Boolean = {
     leaderIsrUpdateLock synchronized {
+      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request"
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
       trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path
+      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
       info("Starting the follower state transition to follow leader %d for topic %s partition %d"
         .format(newLeaderBrokerId, topic, partitionId))
@@ -290,8 +306,10 @@ class Partition(val topic: String,
   private def updateIsr(newIsr: Set[Replica]) {
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
+    // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newIsr
       zkVersion = newVersion

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ControllerMovedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ControllerMovedException.scala?rev=1411010&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ControllerMovedException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ControllerMovedException.scala Sun Nov 18 22:48:20 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+class ControllerMovedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Sun Nov 18 22:48:20 2012
@@ -40,6 +40,7 @@ object ErrorMapping {
   val BrokerNotAvailableCode: Short = 8
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
+  val StaleControllerEpochCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@ object ErrorMapping {
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
-      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala Sun Nov 18 22:48:20 2012
@@ -159,12 +159,13 @@ class ControllerBrokerRequestBatch(sendR
     stopAndDeleteReplicaRequestMap.clear()
   }
 
-  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
+  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
+                                       leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
     brokerIds.foreach { brokerId =>
       leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
                                              new mutable.HashMap[(String, Int), PartitionStateInfo])
       leaderAndIsrRequestMap(brokerId).put((topic, partition),
-                                           PartitionStateInfo(leaderAndIsr, replicationFactor))
+                                           PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
     }
   }
 
@@ -183,13 +184,13 @@ class ControllerBrokerRequestBatch(sendR
     }
   }
 
-  def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
+  def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
-      val partitionStateInfos = m._2
-      val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet
+      val partitionStateInfos = m._2.toMap
+      val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
@@ -201,7 +202,8 @@ class ControllerBrokerRequestBatch(sendR
             if (replicas.size > 0) {
               debug("The stop replica request (delete = %s) sent to broker %d is %s"
                 .format(deletePartitions, broker, replicas.mkString(",")))
-              sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null)
+              sendRequest(broker, new StopReplicaRequest(deletePartitions,
+                Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
             }
         }
         m.clear()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala Sun Nov 18 22:48:20 2012
@@ -24,23 +24,27 @@ import java.util.concurrent.TimeUnit
 import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
+import kafka.common._
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
 import kafka.utils.{Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
-import org.I0Itec.zkclient.exception.ZkNoNodeException
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import scala.Some
+import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
                         val brokerShutdownLock: Object = new Object,
+                        var epoch: Int = KafkaController.InitialControllerEpoch - 1,
+                        var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
                         var allTopics: Set[String] = Set.empty,
                         var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
-                        var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty,
+                        var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
@@ -68,6 +72,8 @@ trait KafkaControllerMBean {
 
 object KafkaController {
   val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+  val InitialControllerEpoch = 1
+  val InitialControllerEpochZkVersion = 1
 }
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
@@ -82,6 +88,7 @@ class KafkaController(val config : Kafka
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  registerControllerChangedListener()
 
   newGauge(
     "ActiveControllerCount",
@@ -90,6 +97,8 @@ class KafkaController(val config : Kafka
     }
   )
 
+  def epoch = controllerContext.epoch
+
   /**
    * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
    * the controller first determines the partitions that the shutting down
@@ -127,8 +136,8 @@ class KafkaController(val config : Kafka
       def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
         trace("All leaders = " + controllerContext.allLeaders.mkString(","))
         controllerContext.allLeaders.filter {
-          case (topicAndPartition, leader) =>
-            leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
         }.map(_._1)
       }
 
@@ -139,18 +148,18 @@ class KafkaController(val config : Kafka
         val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
-          controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr =>
-            if (currLeaderAndIsr.leader == id) {
+          controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
+            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition)
+              val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition)
 
               // mark replica offline only if leadership was moved successfully
-              if (newLeaderAndIsr.leader != currLeaderAndIsr.leader)
+              if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
                 replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
             } else
               debug("Partition %s moved from leader %d to new leader %d during shutdown."
-                .format(topicAndPartition, id, currLeaderAndIsr.leader))
+                .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
           }
         }
       }
@@ -165,18 +174,19 @@ class KafkaController(val config : Kafka
       allPartitionsAndReplicationFactorOnBroker foreach {
         case(topicAndPartition, replicationFactor) =>
           val (topic, partition) = topicAndPartition.asTuple
-          if (controllerContext.allLeaders(topicAndPartition).leader != id) {
+          if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
             brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
             removeReplicaFromIsr(topic, partition, id) match {
-              case Some(updatedLeaderAndIsr) =>
+              case Some(updatedLeaderIsrAndControllerEpoch) =>
                 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                  Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor)
+                  Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
+                  updatedLeaderIsrAndControllerEpoch, replicationFactor)
               case None =>
               // ignore
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
@@ -187,15 +197,21 @@ class KafkaController(val config : Kafka
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
-   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   * 1. Register controller epoch changed listener
+   * 2. Increments the controller epoch
+   * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
    *    leaders for all existing partitions.
-   * 2. Starts the controller's channel manager
-   * 3. Starts the replica state machine
-   * 4. Starts the partition state machine
+   * 4. Starts the controller's channel manager
+   * 5. Starts the replica state machine
+   * 6. Starts the partition state machine
+   * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
+   * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   def onControllerFailover() {
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
+      // increment the controller epoch
+      incrementControllerEpoch(zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerPreferredReplicaElectionListener()
@@ -205,7 +221,7 @@ class KafkaController(val config : Kafka
       partitionStateMachine.startup()
       replicaStateMachine.startup()
       Utils.registerMBean(this, KafkaController.MBeanName)
-      info("Broker %d is ready to serve as the new controller".format(config.brokerId))
+      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -268,7 +284,7 @@ class KafkaController(val config : Kafka
     val deadBrokersSet = deadBrokers.toSet
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
-      deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet
+      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -389,6 +405,37 @@ class KafkaController(val config : Kafka
     controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  def incrementControllerEpoch(zkClient: ZkClient) = {
+    try {
+      var newControllerEpoch = controllerContext.epoch + 1
+      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
+        ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
+      if(!updateSucceeded)
+        throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
+      else {
+        controllerContext.epochZkVersion = newVersion
+        controllerContext.epoch = newControllerEpoch
+      }
+    } catch {
+      case nne: ZkNoNodeException =>
+        // if path doesn't exist, this is the first controller whose epoch should be 1
+        // the following call can still fail if another controller gets elected between checking if the path exists and
+        // trying to create the controller epoch path
+        try {
+          zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
+          controllerContext.epoch = KafkaController.InitialControllerEpoch
+          controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
+        } catch {
+          case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+            "Aborting controller startup procedure")
+          case oe => error("Error while incrementing controller epoch", oe)
+        }
+      case oe => error("Error while incrementing controller epoch", oe)
+
+    }
+    info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
+  }
+
   private def registerSessionExpirationListener() = {
     zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
@@ -397,7 +444,7 @@ class KafkaController(val config : Kafka
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
-    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
+    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
@@ -429,7 +476,7 @@ class KafkaController(val config : Kafka
     val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
     // check if they are already completed
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
-      controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head)
+      controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
     controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
     info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@@ -445,13 +492,13 @@ class KafkaController(val config : Kafka
 
   private def updateLeaderAndIsrCache() {
     val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
-    for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
+    for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) {
       // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
-      controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+      controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
         case true =>
-          controllerContext.allLeaders.put(topicPartition, leaderAndIsr)
+          controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch)
         case false =>
-          debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
+          debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
             "partition %s is dead, just ignore it".format(topicPartition))
       }
     }
@@ -469,7 +516,7 @@ class KafkaController(val config : Kafka
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
                                                       reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
+    val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@@ -542,6 +589,10 @@ class KafkaController(val config : Kafka
     zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
   }
 
+  private def registerControllerChangedListener() {
+    zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
+  }
+
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     // read the current list of reassigned partitions from zookeeper
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
@@ -570,7 +621,7 @@ class KafkaController(val config : Kafka
   def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
-      val currentLeader = controllerContext.allLeaders(partition).leader
+      val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if(currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
@@ -598,35 +649,42 @@ class KafkaController(val config : Kafka
    * @return the new leaderAndIsr (with the replica removed if it was present),
    *         or None if leaderAndIsr is empty.
    */
-  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = {
+  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
     val topicAndPartition = TopicAndPartition(topic, partition)
     debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
-    var finalLeaderAndIsr: Option[LeaderAndIsr] = None
+    var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
-        case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
+        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
+          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
+          if(controllerEpoch > epoch)
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
+              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           if (leaderAndIsr.isr.contains(replicaId)) {
             val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
               leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion)
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
+              zkClient,
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+              ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
+              leaderAndIsr.zkVersion)
             newLeaderAndIsr.zkVersion = newVersion
 
-            finalLeaderAndIsr = Some(newLeaderAndIsr)
-            if (updateSucceeded) {
-              // we've successfully written to ZK, let's refresh our cache
-              info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
-              controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr)
-            }
+            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+            if (updateSucceeded)
+              info("New leader and ISR for partition [%s, %d] is %s"
+                   .format(topic, partition, newLeaderAndIsr.toString()))
             updateSucceeded
           } else {
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
-              .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
-            finalLeaderAndIsr = Some(leaderAndIsr)
+                 .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
+            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
             true
           }
         case None =>
@@ -634,7 +692,7 @@ class KafkaController(val config : Kafka
           true
       }
     }
-    finalLeaderAndIsr
+    finalLeaderIsrAndControllerEpoch
   }
 
   class SessionExpirationListener() extends IZkStateListener with Logging {
@@ -859,11 +917,50 @@ class PreferredReplicaElectionListener(c
   }
 }
 
+class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: "
+  val controllerContext = controller.controllerContext
+  readControllerEpochFromZookeeper()
+
+  /**
+   * Invoked when a controller updates the epoch value
+   * @throws Exception On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Controller epoch listener fired with new epoch " + data.toString)
+    controllerContext.controllerLock synchronized {
+      // read the epoch path to get the zk version
+      readControllerEpochFromZookeeper()
+    }
+  }
+
+  /**
+   * @throws Exception
+   *             On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+
+  private def readControllerEpochFromZookeeper() {
+    // initialize the controller epoch and zk version by reading from zookeeper
+    if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
+      val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
+      controllerContext.epoch = epochData._1.toInt
+      controllerContext.epochZkVersion = epochData._2.getVersion
+      info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
+    }
+  }
+}
+
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                        var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 
+case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
+
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala Sun Nov 18 22:48:20 2012
@@ -128,7 +128,7 @@ with Logging {
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
-    val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
+    val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
     if(currentLeader == preferredReplica) {
       throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
         .format(preferredReplica, topic, partition))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala Sun Nov 18 22:48:20 2012
@@ -85,7 +85,7 @@ class PartitionStateMachine(controller: 
         if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     } catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -104,8 +104,8 @@ class PartitionStateMachine(controller: 
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
-    } catch {
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
+    }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
     }
   }
@@ -144,7 +144,7 @@ class PartitionStateMachine(controller: 
             case _ => // should never come here since illegal previous states are checked above
           }
           info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
-            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader))
+            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
           partitionState.put(topicAndPartition, OnlinePartition)
            // post: partition has a leader
         case OfflinePartition =>
@@ -231,22 +231,28 @@ class PartitionStateMachine(controller: 
         debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
         val leader = liveAssignedReplicas.head
-        val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+        val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
+          controller.epoch)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString)
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+            ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
-            topicAndPartition.partition, leaderAndIsr, replicaAssignment.size)
-          controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr)
+            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
+          controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch)
           partitionState.put(topicAndPartition, OnlinePartition)
         } catch {
           case e: ZkNodeExistsException =>
+            // read the controller epoch
+            val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+              topicAndPartition.partition).get
             ControllerStat.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
-              .format(topicAndPartition) + " since Leader and ISR path already exists")
+              .format(topicAndPartition) + " since Leader and isr path already exists with value " +
+              "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
         }
     }
   }
@@ -266,22 +272,30 @@ class PartitionStateMachine(controller: 
       var newLeaderAndIsr: LeaderAndIsr = null
       var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
       while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
+        val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
+        val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
+        val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
+        if(controllerEpoch > controller.epoch)
+          throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+            "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
+            "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
         zookeeperPathUpdateSucceeded = updateSucceeded
         replicasForThisPartition = replicas
       }
+      val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
-      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr)
+      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
       info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
-      // notify all replicas of the new leader
-      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr,
-        controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
+      // store new leader and isr info in cache
+      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
+        newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     } catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)
@@ -299,9 +313,9 @@ class PartitionStateMachine(controller: 
     zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
   }
 
-  private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
-    ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-      case Some(currentLeaderAndIsr) => currentLeaderAndIsr
+  private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
+    ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+      case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
           "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Sun Nov 18 22:48:20 2012
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: Ka
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
+      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
     }
@@ -106,14 +106,14 @@ class ReplicaStateMachine(controller: Ka
         case NewReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its partition
-          val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-          leaderAndIsrOpt match {
-            case Some(leaderAndIsr) =>
-              if(leaderAndIsr.leader == replicaId)
+          val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+          leaderIsrAndControllerEpochOpt match {
+            case Some(leaderIsrAndControllerEpoch) =>
+              if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
                 throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
                   .format(replicaId, topic, partition) + "state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                topic, partition, leaderAndIsr, replicaAssignment.size)
+                                                                  topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
@@ -137,13 +137,14 @@ class ReplicaStateMachine(controller: Ka
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
               info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
             case _ =>
-              // if the leader for this replica exists and is alive, send the leader and ISR
-              controllerContext.allLeaders.get(topicAndPartition) match {
-                case Some(leaderAndIsr) =>
-                  controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+              // check if the leader for this partition is alive or even exists
+                controllerContext.allLeaders.get(topicAndPartition) match {
+                case Some(leaderIsrAndControllerEpoch) =>
+                  controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
                     case true => // leader is alive
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                        topic, partition, leaderAndIsr, replicaAssignment.size)
+                                                                          topic, partition, leaderIsrAndControllerEpoch,
+                                                                          replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
                       info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
                     case false => // ignore partitions whose leader is not alive
@@ -155,14 +156,15 @@ class ReplicaStateMachine(controller: Ka
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
-          val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match {
-            case Some(currLeaderAndIsr) =>
-              if (currLeaderAndIsr.isr.contains(replicaId))
+          val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match {
+            case Some(currLeaderIsrAndControllerEpoch) =>
+              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
-                  case Some(updatedLeaderAndIsr) =>
+                  case Some(updatedLeaderIsrAndControllerEpoch) =>
                     // send the shrunk ISR state change request only to the leader
-                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
-                      topic, partition, updatedLeaderAndIsr, replicaAssignment.size)
+                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+                                                                        topic, partition, updatedLeaderIsrAndControllerEpoch,
+                                                                        replicaAssignment.size)
                     replicaState.put((topic, partition, replicaId), OfflineReplica)
                     info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
                     info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sun Nov 18 22:48:20 2012
@@ -24,7 +24,6 @@ import kafka.network._
 import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
-import mutable.HashMap
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -127,8 +126,8 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
     trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
-      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
+      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -144,13 +143,8 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
     trace("Handling stop replica request " + stopReplicaRequest)
 
-    val responseMap = new HashMap[(String, Int), Short]
-    for((topic, partitionId) <- stopReplicaRequest.partitions) {
-      val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
-      responseMap.put((topic, partitionId), errorCode)
-    }
-
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
+    val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sun Nov 18 22:48:20 2012
@@ -18,6 +18,7 @@ package kafka.server
 
 import kafka.cluster.{Broker, Partition, Replica}
 import collection._
+import mutable.HashMap
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
@@ -26,7 +27,8 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common._
-import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.controller.KafkaController
 
 
 object ReplicaManager {
@@ -38,6 +40,8 @@ class ReplicaManager(val config: KafkaCo
                      val zkClient: ZkClient, 
                      kafkaScheduler: KafkaScheduler,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+  /* epoch of the controller that last changed the leader */
+  @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
@@ -110,6 +114,23 @@ class ReplicaManager(val config: KafkaCo
     errorCode
   }
 
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+    if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
+      error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    } else {
+      controllerEpoch = stopReplicaRequest.controllerEpoch
+      val responseMap = new HashMap[(String, Int), Short]
+      for((topic, partitionId) <- stopReplicaRequest.partitions){
+        val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
+        responseMap.put((topic, partitionId), errorCode)
+      }
+      (responseMap, ErrorMapping.NoError)
+    }
+  }
+
   def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
@@ -158,49 +179,42 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
-
-    for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = topicAndPartition._1
-      val partitionId = topicAndPartition._2
-
-      val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
-      try {
-        if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, partitionStateInfo)
-        else
-          makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
-      } catch {
-        case e =>
-          error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
-          errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
+        " Latest known controller epoch is %d " + controllerEpoch)
+      (responseMap, ErrorMapping.StaleControllerEpochCode)
+    }else {
+      controllerEpoch = leaderAndISRRequest.controllerEpoch
+      for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
+        var errorCode = ErrorMapping.NoError
+        val topic = topicAndPartition._1
+        val partitionId = topicAndPartition._2
+
+        val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+        try {
+          if(requestedLeaderId == config.brokerId)
+            makeLeader(topic, partitionId, partitionStateInfo)
+          else
+            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+        } catch {
+          case e =>
+            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        }
+        responseMap.put(topicAndPartition, errorCode)
       }
-      responseMap.put(topicAndPartition, errorCode)
+      (responseMap, ErrorMapping.NoError)
     }
-
-    /**
-     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
-     *  as deleted.
-     *  TODO: Handle this properly as part of KAFKA-330
-     */
-//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-//      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1)
-//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-//    }
-
-    responseMap
   }
 
   private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
-    val leaderAndIsr = partitionStateInfo.leaderAndIsr
+    val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeader(topic, partitionId, leaderAndIsr)) {
+    if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -209,14 +223,15 @@ class ReplicaManager(val config: KafkaCo
     info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) {
-    val leaderAndIsr = partitionStateInfo.leaderAndIsr
-    val leaderBrokerId: Int = leaderAndIsr.leader
+  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
+                           liveBrokers: Set[Broker]) {
+    val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+    val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition %d"
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) {
+    if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
@@ -233,7 +248,7 @@ class ReplicaManager(val config: KafkaCo
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
     val partitionOpt = getPartition(topic, partitionId)
-    if(partitionOpt.isDefined){
+    if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
       warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala Sun Nov 18 22:48:20 2012
@@ -44,8 +44,6 @@ class ZookeeperLeaderElector(controllerC
     }
   }
 
-  def amILeader : Boolean = leaderId == brokerId
-
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
     try {
@@ -56,10 +54,14 @@ class ZookeeperLeaderElector(controllerC
     } catch {
       case e: ZkNodeExistsException =>
         // If someone else has written the path, then
-        debug("Someone else was elected as leader other than " + brokerId)
         val data: String = controllerContext.zkClient.readData(electionPath, true)
-        if (data != null) leaderId = data.toInt
-      case e2 => throw e2
+        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
+        if (data != null) {
+          leaderId = data.toInt
+        }
+      case e2 =>
+        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+        resign()
     }
     amILeader
   }
@@ -68,6 +70,12 @@ class ZookeeperLeaderElector(controllerC
     leaderId = -1
   }
 
+  def amILeader : Boolean = leaderId == brokerId
+
+  def resign() = {
+    deletePath(controllerContext.zkClient, electionPath)
+  }
+
   /**
    * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
    * have its own session expiration listener and handler
@@ -79,6 +87,10 @@ class ZookeeperLeaderElector(controllerC
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
+      controllerContext.controllerLock synchronized {
+        leaderId = data.toString.toInt
+        info("New leader is %d".format(leaderId))
+      }
     }
 
     /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sun Nov 18 22:48:20 2012
@@ -24,17 +24,19 @@ import org.I0Itec.zkclient.exception.{Zk
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import kafka.api.LeaderAndIsr
+import mutable.HashMap
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
-import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
 import kafka.admin._
 import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
+import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
+  val ControllerEpochPath = "/controllerEpoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
@@ -74,7 +76,7 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
     val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
     val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
     val leaderAndIsrOpt = leaderAndIsrInfo._1
@@ -85,17 +87,23 @@ object ZkUtils extends Logging {
     }
   }
 
-  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
+  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
+    getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
+  }
+
+  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
+  : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
         val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
         val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
         val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
       case None => None
     }
   }
@@ -189,6 +197,15 @@ object ZkUtils extends Logging {
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
+  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leaderAndIsr.leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
+    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
+    Utils.stringMapToJson(jsonDataMap)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -314,6 +331,25 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
+   * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
+   */
+  def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
+    try {
+      val stat = client.writeData(path, data, expectVersion)
+      info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
+        .format(path, data, expectVersion, stat.getVersion))
+      (true, stat.getVersion)
+    } catch {
+      case nne: ZkNoNodeException => throw nne
+      case e: Exception =>
+        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
+          expectVersion), e)
+        (false, -1)
+    }
+  }
+
+  /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */
@@ -439,13 +475,13 @@ object ZkUtils extends Logging {
   }
 
   def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
-  mutable.Map[TopicAndPartition, LeaderAndIsr] = {
-    val ret = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
+  mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+    val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
     for((topic, partitions) <- partitionsForTopics) {
       for(partition <- partitions) {
-        ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match {
-          case Some(leaderAndIsr) => ret.put(TopicAndPartition(topic, partition.toInt), leaderAndIsr)
+        ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match {
+          case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch)
           case None =>
         }
       }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sun Nov 18 22:48:20 2012
@@ -159,7 +159,7 @@ class AdminTest extends JUnit3Suite with
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
@@ -189,7 +189,7 @@ class AdminTest extends JUnit3Suite with
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
 
     val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
     newTopicMetadata.errorCode match {



Mime
View raw message