kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1387270 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/controller/ main/scala/kafka/server/ main/scala/kafka/tools/ main/scala...
Date Tue, 18 Sep 2012 17:21:16 GMT
Author: nehanarkhede
Date: Tue Sep 18 17:21:15 2012
New Revision: 1387270

URL: http://svn.apache.org/viewvc?rev=1387270&view=rev
Log:
KAFKA-499 Refactor controller; patched by Neha Narkhede; reviewed by Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/LeaderElector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Sep 18 17:21:15 2012
@@ -89,8 +89,8 @@ object AdminUtils extends Logging {
   def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map { topic =>
-      if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
+      if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
         val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
 
         val partitionMetadata = sortedPartitions.map { partitionMap =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Tue Sep 18 17:21:15 2012
@@ -99,7 +99,6 @@ object CreateTopicCommand extends Loggin
       AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-
     debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Tue Sep 18 17:21:15 2012
@@ -72,7 +72,6 @@ object LeaderAndIsrRequest {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val isInit = if(buffer.get() == 1.toByte) true else false
     val ackTimeoutMs = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
@@ -84,25 +83,24 @@ object LeaderAndIsrRequest {
 
       leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
     }
-    new LeaderAndIsrRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
   }
 }
 
 
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
-                                isInit: Boolean,
                                 ackTimeoutMs: Int,
                                 leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
-  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
+
+  def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    buffer.put(if(isInit) 1.toByte else 0.toByte)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class StateChangeFailedException(message: String) extends RuntimeException(message) {
+  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString)
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Tue Sep 18 17:21:15 2012
@@ -410,7 +410,7 @@ private[kafka] class ZookeeperConsumerCo
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
-      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
 
       /**
        * fetchers must be stopped to avoid data duplication, since if the current

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.network.{Receive, BlockingChannel}
+import kafka.utils.{Logging, ShutdownableThread}
+import collection.mutable.HashMap
+import kafka.cluster.Broker
+import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
+import kafka.server.KafkaConfig
+import collection.mutable
+import kafka.api._
+
+class ControllerChannelManager private (config: KafkaConfig) extends Logging {
+  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+  private val brokerLock = new Object
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+
+  def this(allBrokers: Set[Broker], config : KafkaConfig) {
+    this(config)
+    allBrokers.foreach(addNewBroker(_))
+  }
+
+  def startup() = {
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
+    }
+  }
+
+  def shutdown() = {
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
+    }
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
+    brokerLock synchronized {
+      brokerStateInfo(brokerId).messageQueue.put((request, callback))
+    }
+  }
+
+  def addBroker(broker: Broker) {
+    // be careful here. Maybe the startup() API has already started the request send thread
+    brokerLock synchronized {
+      if(!brokerStateInfo.contains(broker.id)) {
+        addNewBroker(broker)
+        startRequestSendThread(broker.id)
+      }
+    }
+  }
+
+  def removeBroker(brokerId: Int) {
+    brokerLock synchronized {
+      removeExistingBroker(brokerId)
+    }
+  }
+
+  private def addNewBroker(broker: Broker) {
+    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    val channel = new BlockingChannel(broker.host, broker.port,
+      BlockingChannel.UseDefaultBufferSize,
+      BlockingChannel.UseDefaultBufferSize,
+      config.controllerSocketTimeoutMs)
+    channel.connect()
+    val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
+    requestThread.setDaemon(false)
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
+  }
+
+  private def removeExistingBroker(brokerId: Int) {
+    try {
+      brokerStateInfo(brokerId).channel.disconnect()
+      brokerStateInfo(brokerId).requestSendThread.shutdown()
+      brokerStateInfo.remove(brokerId)
+    }catch {
+      case e => error("Error while removing broker by the controller", e)
+    }
+  }
+
+  private def startRequestSendThread(brokerId: Int) {
+    val requestThread = brokerStateInfo(brokerId).requestSendThread
+    if(requestThread.getState == Thread.State.NEW)
+      requestThread.start()
+  }
+}
+
+class RequestSendThread(val controllerId: Int,
+                        val toBrokerId: Int,
+                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                        val channel: BlockingChannel)
+  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
+  private val lock = new Object()
+
+  override def doWork(): Unit = {
+    val queueItem = queue.take()
+    val request = queueItem._1
+    val callback = queueItem._2
+
+    var receive: Receive = null
+
+    try{
+      lock synchronized {
+        channel.send(request)
+        receive = channel.receive()
+        var response: RequestOrResponse = null
+        request.requestId.get match {
+          case RequestKeys.LeaderAndIsrKey =>
+            response = LeaderAndISRResponse.readFrom(receive.buffer)
+          case RequestKeys.StopReplicaKey =>
+            response = StopReplicaResponse.readFrom(receive.buffer)
+        }
+        trace("got a response %s".format(controllerId, response, toBrokerId))
+
+        if(callback != null){
+          callback(response)
+        }
+      }
+    } catch {
+      case e =>
+        // log it and let it go. Let controller shut it down.
+        debug("Exception occurs", e)
+    }
+  }
+}
+
+// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr
+// request
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+  extends  Logging {
+  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+
+  def newBatch() {
+    // raise error if the previous batch is not empty
+    if(brokerRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
+        "a new one. Some state changes %s might be lost ".format(brokerRequestMap.toString()))
+    brokerRequestMap.clear()
+  }
+
+  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
+    brokerIds.foreach { brokerId =>
+      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
+      brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+    }
+  }
+
+  def sendRequestsToBrokers() {
+    brokerRequestMap.foreach { m =>
+      val broker = m._1
+      val leaderAndIsr = m._2
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
+      debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest))
+      sendRequest(broker, leaderAndIsrRequest, null)
+    }
+    brokerRequestMap.clear()
+  }
+}
+
+case class ControllerBrokerStateInfo(channel: BlockingChannel,
+                                     broker: Broker,
+                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                                     requestSendThread: RequestSendThread)
+

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import collection._
+import collection.immutable.Set
+import kafka.cluster.Broker
+import kafka.api._
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.utils.{ZkUtils, Logging}
+import java.lang.Object
+import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import com.yammer.metrics.core.Gauge
+
+class ControllerContext(val zkClient: ZkClient,
+                        var controllerChannelManager: ControllerChannelManager = null,
+                        val controllerLock: Object = new Object,
+                        var liveBrokers: Set[Broker] = null,
+                        var liveBrokerIds: Set[Int] = null,
+                        var allTopics: Set[String] = null,
+                        var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null,
+                        var allLeaders: mutable.Map[(String, Int), Int] = null)
+
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
+  this.logIdent = "[Controller " + config.brokerId + "], "
+  private var isRunning = true
+  val controllerContext = new ControllerContext(zkClient)
+  private val partitionStateMachine = new PartitionStateMachine(this)
+  private val replicaStateMachine = new ReplicaStateMachine(this)
+  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
+    config.brokerId)
+
+  newGauge(
+    "ActiveControllerCount",
+    new Gauge[Int] {
+      def value() = if (isActive) 1 else 0
+    }
+  )
+
+  /**
+   * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
+   * It does the following things on the become-controller state change -
+   * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
+   *    leaders for all existing partitions.
+   * 2. Starts the controller's channel manager
+   * 3. Starts the replica state machine
+   * 4. Starts the partition state machine
+   */
+  def onControllerFailover() {
+    if(isRunning) {
+      info("Broker %d starting become controller state transition".format(config.brokerId))
+      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
+      partitionStateMachine.registerListeners()
+      replicaStateMachine.registerListeners()
+      initializeControllerContext()
+      partitionStateMachine.startup()
+      replicaStateMachine.startup()
+      info("Broker %d is ready to serve as the new controller".format(config.brokerId))
+    }else
+      info("Controller has been shut down, aborting startup/failover")
+  }
+
+  /**
+   * Returns true if this broker is the current controller.
+   */
+  def isActive(): Boolean = {
+    controllerContext.controllerChannelManager != null
+  }
+
+  /**
+   * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
+   * brokers as input. It does the following -
+   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update
+   *    leader and ISR for every partition as they take place
+   * 2. Triggers the OnlinePartition state change for all new/offline partitions
+   * 3. Invokes the OnlineReplica state change on the input list of newly started brokers
+   */
+  def onBrokerStartup(newBrokers: Seq[Int]) {
+    info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+    // update leader and isr cache for broker
+    updateLeaderAndIsrCache()
+    // update partition state machine
+    partitionStateMachine.triggerOnlinePartitionStateChange()
+    replicaStateMachine.handleStateChanges(newBrokers, OnlineReplica)
+  }
+
+  /**
+   * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
+   * as input. It does the following -
+   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update
+   *    leader and ISR for every partition as they take place
+   * 2. Mark partitions with dead leaders offline
+   * 3. Triggers the OnlinePartition state change for all new/offline partitions
+   * 4. Invokes the OfflineReplica state change on the input list of newly started brokers
+   */
+  def onBrokerFailure(deadBrokers: Seq[Int]) {
+    info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+    // update leader and isr cache for broker
+    updateLeaderAndIsrCache()
+    // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
+    val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
+      deadBrokers.contains(partitionAndLeader._2)).map(_._1).toSeq
+    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
+    // trigger OnlinePartition state changes for offline or new partitions
+    partitionStateMachine.triggerOnlinePartitionStateChange()
+    // handle dead replicas
+    replicaStateMachine.handleStateChanges(deadBrokers, OfflineReplica)
+  }
+
+  /**
+   * This callback is invoked by the partition state machine's topic change listener with the list of failed brokers
+   * as input. It does the following -
+   * 1. Registers partition change listener. This is not required until KAFKA-347
+   * 2. Invokes the new partition callback
+   */
+  def onNewTopicCreation(topics: Set[String], newPartitions: Seq[(String, Int)]) {
+    info("New topic creation callback for %s".format(newPartitions.mkString(",")))
+    // subscribe to partition changes
+    topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
+    onNewPartitionCreation(newPartitions)
+  }
+
+  /**
+   * This callback is invoked by the topic change callback with the list of failed brokers as input.
+   * It does the following -
+   * 1. Move the newly created partitions to the NewPartition state
+   * 2. Move the newly created partitions from NewPartition->OnlinePartition state
+   */
+  def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) {
+    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
+    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
+    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
+  }
+
+  /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
+  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
+  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+    for((topicPartition, brokers) <- replicaAssignment){
+      for (broker <- brokers){
+        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+      }
+      controllerContext.allLeaders.remove(topicPartition)
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
+    }
+    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+      sendRequest(broker, stopReplicaRequest)
+    }
+  }
+
+  /**
+   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
+   * is the controller. It merely registers the session expiration listener and starts the controller leader
+   * elector
+   */
+  def startup() = {
+    controllerContext.controllerLock synchronized {
+      info("Controller starting up");
+      registerSessionExpirationListener()
+      isRunning = true
+      controllerElector.startup
+      info("Controller startup complete")
+    }
+  }
+
+  /**
+   * Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller,
+   * it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also
+   * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
+   */
+  def shutdown() = {
+    controllerContext.controllerLock synchronized {
+      isRunning = false
+      partitionStateMachine.shutdown()
+      replicaStateMachine.shutdown()
+      if(controllerContext.controllerChannelManager != null) {
+        controllerContext.controllerChannelManager.shutdown()
+        controllerContext.controllerChannelManager = null
+        info("Controller shutdown complete")
+      }
+    }
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
+    controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
+  }
+
+  private def registerSessionExpirationListener() = {
+    zkClient.subscribeStateChanges(new SessionExpireListener())
+  }
+
+  private def initializeControllerContext() {
+    controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+    controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
+    controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
+    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
+      controllerContext.allTopics.toSeq)
+    controllerContext.allLeaders = new mutable.HashMap[(String, Int), Int]
+    // update the leader and isr cache for all existing partitions from Zookeeper
+    updateLeaderAndIsrCache()
+    // start the channel manager
+    startChannelManager()
+    info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
+    info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
+  }
+
+  private def startChannelManager() {
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext.liveBrokers, config)
+    controllerContext.controllerChannelManager.startup()
+  }
+
+  private def updateLeaderAndIsrCache() {
+    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
+    for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
+      // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
+      controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+        case true =>
+          controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader)
+        case false =>
+          debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
+            "partition [%s, %d] is dead, just ignore it".format(topicPartition._1, topicPartition._2))
+      }
+    }
+  }
+
+  class SessionExpireListener() extends IZkStateListener with Logging {
+    this.logIdent = "[Controller " + config.brokerId + "], "
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      controllerContext.controllerLock synchronized {
+        partitionStateMachine.shutdown()
+        replicaStateMachine.shutdown()
+        if(controllerContext.controllerChannelManager != null) {
+          info("session expires, clean up the state")
+          controllerContext.controllerChannelManager.shutdown()
+          controllerContext.controllerChannelManager = null
+        }
+        controllerElector.elect
+      }
+    }
+  }
+}
+
+object ControllerStat extends KafkaMetricsGroup {
+  val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
+  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import collection._
+import kafka.api.LeaderAndIsr
+import kafka.utils.{Logging, ZkUtils}
+import org.I0Itec.zkclient.IZkChildListener
+import collection.JavaConversions._
+import kafka.common.{StateChangeFailedException, PartitionOfflineException, KafkaException}
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+/**
+ * This class represents the state machine for partitions. It defines the states that a partition can be in, and
+ * transitions to move the partition to another legal state. The different states that a partition can be in are -
+ * 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
+ *                          deleted. Valid previous state, if one exists, is OfflinePartition
+ * 2. NewPartition        : After creation, the partition is in the NewPartition state. In this state, the partition should have
+ *                          replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
+ * 3. OnlinePartition     : Once a leader is elected for a partition, it is in the OnlinePartition state.
+ *                          Valid previous states are NewPartition/OfflinePartition
+ * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
+ *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
+ */
+class PartitionStateMachine(controller: KafkaController) extends Logging {
+  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
+  private val controllerContext = controller.controllerContext
+  private val zkClient = controllerContext.zkClient
+  var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  private var isShuttingDown = new AtomicBoolean(false)
+
+  /**
+   * Invoked on successful controller election. First registers a topic change listener since that triggers all
+   * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
+   * the OnlinePartition state change for all new or offline partitions.
+   */
+  def startup() {
+    isShuttingDown.set(false)
+    // initialize partition state
+    initializePartitionState()
+    // try to move partitions to online state
+    triggerOnlinePartitionStateChange()
+    info("Started partition state machine with initial state -> " + partitionState.toString())
+  }
+
+  // register topic and partition change listeners
+  def registerListeners() {
+    registerTopicChangeListener()
+  }
+
+  /**
+   * Invoked on controller shutdown.
+   */
+  def shutdown() {
+    isShuttingDown.compareAndSet(false, true)
+    partitionState.clear()
+  }
+
+  /**
+   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
+   * state. This is called on a successful controller election and on broker changes
+   */
+  def triggerOnlinePartitionStateChange() {
+    try {
+      brokerRequestBatch.newBatch()
+      // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
+      partitionState.filter(partitionAndState =>
+        partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
+        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition)
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+    }catch {
+      case e => error("Error while moving some partitions to the online state", e)
+    }
+  }
+
+  /**
+   * This API is invoked by the partition change zookeeper listener
+   * @param partitions   The list of partitions that need to be transitioned to the target state
+   * @param targetState  The state that the partitions should be moved to
+   */
+  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState) {
+    info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
+    try {
+      brokerRequestBatch.newBatch()
+      partitions.foreach { topicAndPartition =>
+        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState)
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+    }catch {
+      case e => error("Error while moving some partitions to %s state".format(targetState), e)
+    }
+  }
+
+  /**
+   * This API exercises the partition's state machine. It ensures that every state transition happens from a legal
+   * previous state to the target state.
+   * @param topic       The topic of the partition for which the state transition is invoked
+   * @param partition   The partition for which the state transition is invoked
+   * @param targetState The end state that the partition should be moved to
+   */
+  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState) {
+    try {
+      partitionState.getOrElseUpdate((topic, partition), NonExistentPartition)
+      targetState match {
+        case NewPartition =>
+          // pre: partition did not exist before this
+          // post: partition has been assigned replicas
+          assertValidPreviousStates(topic, partition, List(NonExistentPartition), NewPartition)
+          assignReplicasToPartitions(topic, partition)
+          partitionState.put((topic, partition), NewPartition)
+          info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
+            "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(",")))
+        case OnlinePartition =>
+          // pre: partition should be in New state
+          assertValidPreviousStates(topic, partition, List(NewPartition, OfflinePartition), OnlinePartition)
+          partitionState(topic, partition) match {
+            case NewPartition =>
+              // initialize leader and isr path for new partition
+              initializeLeaderAndIsrForPartition(topic, partition, brokerRequestBatch)
+            case OfflinePartition =>
+              electLeaderForOfflinePartition(topic, partition, brokerRequestBatch)
+            case _ => // should never come here since illegal previous states are checked above
+          }
+          info("Partition [%s, %d] state changed from %s to Online with leader %d".format(topic, partition,
+            partitionState(topic, partition), controllerContext.allLeaders(topic, partition)))
+          partitionState.put((topic, partition), OnlinePartition)
+           // post: partition has a leader
+        case OfflinePartition =>
+          // pre: partition should be in Online state
+          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition), OfflinePartition)
+          // should be called when the leader for a partition is no longer alive
+          info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
+          partitionState.put((topic, partition), OfflinePartition)
+          // post: partition has no alive leader
+        case NonExistentPartition =>
+          // pre: partition could be in either of the above states
+          assertValidPreviousStates(topic, partition, List(OfflinePartition), NonExistentPartition)
+          info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
+          partitionState.put((topic, partition), NonExistentPartition)
+          // post: partition state is deleted from all brokers and zookeeper
+      }
+    }catch {
+      case e => error("State change for partition [%s, %d] ".format(topic, partition) +
+        "from %s to %s failed".format(partitionState(topic, partition), targetState), e)
+    }
+  }
+
+  /**
+   * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
+   * zookeeper
+   */
+  private def initializePartitionState() {
+    for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+      val topic = topicPartition._1
+      val partition = topicPartition._2
+      // check if leader and isr path exists for partition. If not, then it is in NEW state
+      ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+        case Some(currentLeaderAndIsr) =>
+          // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
+          controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+            case true => // leader is alive
+              partitionState.put(topicPartition, OnlinePartition)
+            case false =>
+              partitionState.put(topicPartition, OfflinePartition)
+          }
+        case None =>
+          partitionState.put(topicPartition, NewPartition)
+      }
+    }
+  }
+
+  private def assertValidPreviousStates(topic: String, partition: Int, fromStates: Seq[PartitionState],
+                                        targetState: PartitionState) {
+    if(!fromStates.contains(partitionState((topic, partition))))
+      throw new IllegalStateException("Partition [%s, %d] should be in the %s states before moving to %s state"
+        .format(topic, partition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
+        .format(partitionState((topic, partition))))
+  }
+
+  /**
+   * Invoked on the NonExistentPartition->NewPartition state transition to update the controller's cache with the
+   * partition's replica assignment.
+   * @topic     The topic of the partition whose replica assignment is to be cached
+   * @partition The partition whose replica assignment is to be cached
+   */
+  private def assignReplicasToPartitions(topic: String, partition: Int) {
+    val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
+    controllerContext.partitionReplicaAssignment += (topic, partition) -> assignedReplicas
+  }
+
+  /**
+   * Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
+   * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it's leader and isr
+   * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
+   * OfflinePartition state.
+   * @topic               The topic of the partition whose leader and isr path is to be initialized
+   * @partition           The partition whose leader and isr path is to be initialized
+   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
+   *                      this state change
+   */
+  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int,
+                                                 brokerRequestBatch: ControllerBrokerRequestBatch) {
+    debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition))
+    val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
+    val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
+    liveAssignedReplicas.size match {
+      case 0 =>
+        ControllerStat.offlinePartitionRate.mark()
+        throw new StateChangeFailedException(("During state change of partition (%s, %d) from NEW to ONLINE, assigned replicas are " +
+          "[%s], live brokers are [%s]. No assigned replica is alive").format(topic, partition,
+          replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
+      case _ =>
+        debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition, liveAssignedReplicas))
+        // make the first replica in the list of assigned replicas, the leader
+        val leader = liveAssignedReplicas.head
+        var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+        try {
+          ZkUtils.createPersistentPath(controllerContext.zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
+          // TODO: the above write can fail only if the current controller lost its zk session and the new controller
+          // took over and initialized this partition. This can happen if the current controller went into a long
+          // GC pause
+          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
+          controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
+          partitionState.put((topic, partition), OnlinePartition)
+        }catch {
+          case e: ZkNodeExistsException =>
+            ControllerStat.offlinePartitionRate.mark()
+            throw new StateChangeFailedException("Error while changing partition [%s, %d]'s state from New to Online"
+              .format(topic, partition) + " since Leader and ISR path already exists")
+        }
+    }
+  }
+
+  /**
+   * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
+   * for the input offline partition
+   * @topic               The topic of the offline partition
+   * @partition           The offline partition
+   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
+   *                      this state change
+   */
+  private def electLeaderForOfflinePartition(topic: String, partition: Int,
+                                             brokerRequestBatch: ControllerBrokerRequestBatch) {
+    /** handle leader election for the partitions whose leader is no longer alive **/
+    info("Electing leader for Offline partition [%s, %d]".format(topic, partition))
+    try {
+      controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+        case Some(assignedReplicas) =>
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          try {
+            // elect new leader or throw exception
+            val newLeaderAndIsr = electLeaderForPartition(topic, partition, assignedReplicas)
+            info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+            // store new leader and isr info in cache
+            brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition, topic, partition,
+              newLeaderAndIsr)
+          }catch {
+            case e => throw new StateChangeFailedException(("Error while electing leader for partition" +
+              " [%s, %d]").format(topic, partition), e)
+          }
+        case None => throw new KafkaException(("While handling broker changes, the " +
+          "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
+          .format(topic, partition, controllerContext.partitionReplicaAssignment))
+      }
+    }catch {
+      case e => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
+        .format(topic, partition) + " Marking this partition offline")
+    }
+    debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
+  }
+
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * This API selects a new leader for the input partition -
+   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+   */
+  private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = {
+    var zookeeperPathUpdateSucceeded: Boolean = false
+    var newLeaderAndIsr: LeaderAndIsr = null
+    while(!zookeeperPathUpdateSucceeded) {
+      newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+        case Some(currentLeaderAndIsr) =>
+          var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+          val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+          debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+            .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+            currentLeaderIsrZkPathVersion))
+          newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+            case true =>
+              debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+                .format(liveAssignedReplicasToThisPartition.mkString(",")))
+              liveAssignedReplicasToThisPartition.isEmpty match {
+                case true =>
+                  ControllerStat.offlinePartitionRate.mark()
+                  throw new PartitionOfflineException(("No replica for partition " +
+                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+                  " Assigned replicas are: [%s]".format(assignedReplicas))
+                case false =>
+                  ControllerStat.uncleanLeaderElectionRate.mark()
+                  val newLeader = liveAssignedReplicasToThisPartition.head
+                  warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+                    "There's potential data loss")
+                  new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+              }
+            case false =>
+              val newLeader = liveBrokersInIsr.head
+              debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+          }
+          info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+          // update the new leadership decision in zookeeper or retry
+          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          newLeaderAndIsr.zkVersion = newVersion
+          zookeeperPathUpdateSucceeded = updateSucceeded
+          newLeaderAndIsr
+        case None =>
+          throw new StateChangeFailedException("On broker changes, " +
+            "there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topic, partition))
+      }
+    }
+    // update the leader cache
+    controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+    newLeaderAndIsr
+  }
+
+  private def registerTopicChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
+  }
+
+  def registerPartitionChangeListener(topic: String) = {
+    zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
+  }
+
+  /**
+   * This is the zookeeper listener that triggers all the state transitions for a partition
+   */
+  class TopicChangeListener extends IZkChildListener with Logging {
+    this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+      if(!isShuttingDown.get()) {
+        controllerContext.controllerLock synchronized {
+          try {
+            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+            val currentChildren = JavaConversions.asBuffer(children).toSet
+            val newTopics = currentChildren -- controllerContext.allTopics
+            val deletedTopics = controllerContext.allTopics -- currentChildren
+            //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
+            controllerContext.allTopics = currentChildren
+
+            val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
+            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
+            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+            info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+              deletedTopics, addedPartitionReplicaAssignment))
+            if(newTopics.size > 0)
+              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSeq)
+          } catch {
+            case e => error("Error while handling new topic", e )
+          }
+          // TODO: kafka-330  Handle deleted topics
+          // handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
+        }
+      }
+    }
+  }
+
+  class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
+    this.logIdent = "[Controller " + controller.config.brokerId + "], "
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+      controllerContext.controllerLock synchronized {
+        // TODO: To be completed as part of KAFKA-41
+      }
+    }
+  }
+}
+
+sealed trait PartitionState { def state: Byte }
+case object NewPartition extends PartitionState { val state: Byte = 0 }
+case object OnlinePartition extends PartitionState { val state: Byte = 1 }
+case object OfflinePartition extends PartitionState { val state: Byte = 2 }
+case object NonExistentPartition extends PartitionState { val state: Byte = 3 }
+
+

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import collection._
+import kafka.utils.{ZkUtils, Logging}
+import collection.JavaConversions._
+import kafka.api.LeaderAndIsr
+import kafka.common.StateChangeFailedException
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.{IZkChildListener}
+
+/**
+ * This class represents the state machine for replicas. It defines the states that a replica can be in, and
+ * transitions to move the replica to another legal state. The different states that a replica can be in are -
+ * 1. OnlineReplica     : Once a replica is started, it is in this state. Valid previous state are OnlineReplica or
+ *                        OfflineReplica
+ * 2. OfflineReplica    : If a replica dies, it moves to this state. Valid previous state is OnlineReplica
+ */
+class ReplicaStateMachine(controller: KafkaController) extends Logging {
+  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
+  private val controllerContext = controller.controllerContext
+  private val zkClient = controllerContext.zkClient
+  var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  private var isShuttingDown = new AtomicBoolean(false)
+
+  /**
+   * Invoked on successful controller election. First registers a broker change listener since that triggers all
+   * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
+   * Then triggers the OnlineReplica state change for all replicas.
+   */
+  def startup() {
+    isShuttingDown.set(false)
+    // initialize replica state
+    initializeReplicaState()
+    // move all Online replicas to Online
+    handleStateChanges(controllerContext.liveBrokerIds.toSeq, OnlineReplica)
+    info("Started replica state machine with initial state -> " + replicaState.toString())
+  }
+
+  // register broker change listener
+  def registerListeners() {
+    registerBrokerChangeListener()
+  }
+
+  /**
+   * Invoked on controller shutdown.
+   */
+  def shutdown() {
+    isShuttingDown.compareAndSet(false, true)
+    replicaState.clear()
+  }
+
+  /**
+   * This API is invoked by the broker change controller callbacks and the startup API of the state machine
+   * @param brokerIds    The list of brokers that need to be transitioned to the target state
+   * @param targetState  The state that the replicas should be moved to
+   * The controller's allLeaders cache should have been updated before this
+   */
+  def handleStateChanges(brokerIds: Seq[Int], targetState: ReplicaState) {
+    info("Invoking state change to %s for brokers %s".format(targetState, brokerIds.mkString(",")))
+    try {
+      brokerRequestBatch.newBatch()
+      brokerIds.foreach { brokerId =>
+        // read all the partitions and their assigned replicas into a map organized by
+        // { replica id -> partition 1, partition 2...
+        val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(controllerContext.allTopics.toSeq, brokerId)
+        partitionsAssignedToThisBroker.foreach { topicAndPartition =>
+          handleStateChange(topicAndPartition._1, topicAndPartition._2, brokerId, targetState)
+        }
+        if(partitionsAssignedToThisBroker.size == 0)
+          info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+    }catch {
+      case e => error("Error while moving some replicas to %s state".format(targetState), e)
+    }
+  }
+
+  /**
+   * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
+   * previous state to the target state.
+   * @param topic       The topic of the replica for which the state transition is invoked
+   * @param partition   The partition of the replica for which the state transition is invoked
+   * @param replicaId   The replica for which the state transition is invoked
+   * @param targetState The end state that the replica should be moved to
+   */
+  private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+    try {
+      targetState match {
+        case OnlineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState)
+          // check if the leader for this partition is alive or even exists
+          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
+          // for the ISR anyways
+          val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+          leaderAndIsrOpt match {
+            case Some(leaderAndIsr) =>
+              controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+                case true => // leader is alive
+                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
+                  replicaState.put((topic, partition, replicaId), OnlineReplica)
+                  info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+                case false => // ignore partitions whose leader is not alive
+              }
+            case None => // ignore partitions who don't have a leader yet
+          }
+        case OfflineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica), targetState)
+          // As an optimization, the controller removes dead replicas from the ISR
+          var zookeeperPathUpdateSucceeded: Boolean = false
+          var newLeaderAndIsr: LeaderAndIsr = null
+          while(!zookeeperPathUpdateSucceeded) {
+            // refresh leader and isr from zookeeper again
+            val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+            leaderAndIsrOpt match {
+              case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+                newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+                  leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
+                info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+                // update the new leadership decision in zookeeper or retry
+                val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
+                  leaderAndIsr.zkVersion)
+                newLeaderAndIsr.zkVersion = newVersion
+                zookeeperPathUpdateSucceeded = updateSucceeded
+              case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) +
+                " for partition [%s, %d] since the leader and isr path in zookeeper is empty".format(topic, partition))
+            }
+          }
+          // send the shrunk ISR state change request only to the leader
+          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
+          // update the local leader and isr cache
+          controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+          replicaState.put((topic, partition, replicaId), OfflineReplica)
+          info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
+          info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
+      }
+    }catch {
+      case e => error("Error while changing state of replica %d for partition ".format(replicaId) +
+        "[%s, %d] to %s".format(topic, partition, targetState), e)
+    }
+  }
+
+  private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
+                                        targetState: ReplicaState) {
+    assert(fromStates.contains(replicaState((topic, partition, replicaId))),
+      "Replica %s for partition [%s, %d] should be in the %s states before moving to %s state"
+        .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
+        ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
+  }
+
+  private def registerBrokerChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+  }
+
+  /**
+   * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
+   * in zookeeper
+   */
+  private def initializeReplicaState() {
+    for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
+      val topic = topicPartition._1
+      val partition = topicPartition._2
+      assignedReplicas.foreach { replicaId =>
+        controllerContext.liveBrokerIds.contains(replicaId) match {
+          case true => replicaState.put((topic, partition, replicaId), OnlineReplica)
+          case false => replicaState.put((topic, partition, replicaId), OfflineReplica)
+        }
+      }
+    }
+  }
+
+  def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[(String, Int)] = {
+    controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
+  }
+
+  /**
+   * This is the zookeeper listener that triggers all the state transitions for a replica
+   */
+  class BrokerChangeListener() extends IZkChildListener with Logging {
+    this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
+    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
+      ControllerStat.leaderElectionTimer.time {
+        info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
+        if(!isShuttingDown.get()) {
+          controllerContext.controllerLock synchronized {
+            try {
+              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+              val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds
+              val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds
+              controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
+              info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+                .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
+              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
+              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
+              if(newBrokerIds.size > 0)
+                controller.onBrokerStartup(newBrokerIds.toSeq)
+              if(deadBrokerIds.size > 0)
+                controller.onBrokerFailure(deadBrokerIds.toSeq)
+            } catch {
+              case e => error("Error while handling broker changes", e)
+            }
+          }
+        }
+      }
+    }
+  }
+}
+
+sealed trait ReplicaState { def state: Byte }
+case object OnlineReplica extends ReplicaState { val state: Byte = 1 }
+case object OfflineReplica extends ReplicaState { val state: Byte = 2 }
+
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Sep 18 17:21:15 2012
@@ -24,6 +24,7 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import org.I0Itec.zkclient.ZkClient
+import kafka.controller.KafkaController
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/LeaderElector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/LeaderElector.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/LeaderElector.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/LeaderElector.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.Logging
+
+/**
+ * This trait defines a leader elector If the existing leader is dead, this class will handle automatic
+ * re-election and if it succeeds, it invokes the leader state change callback
+ */
+trait LeaderElector extends Logging {
+  def startup
+
+  def amILeader : Boolean
+
+//  def electAndBecomeLeader: Unit
+//
+  def elect: Boolean
+
+  def close
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Sep 18 17:21:15 2012
@@ -163,13 +163,14 @@ class ReplicaManager(val config: KafkaCo
     /**
      *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
      *  as deleted.
+     *  TODO: Handle this properly as part of KAFKA-330
      */
-    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
-      startHighWaterMarksCheckPointThread
-      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
-      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
-    }
+//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
+//      startHighWaterMarksCheckPointThread
+//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
+//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
+//    }
 
     responseMap
   }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala?rev=1387270&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala Tue Sep 18 17:21:15 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.ZkUtils._
+import kafka.utils.Logging
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.I0Itec.zkclient.{IZkDataListener}
+import kafka.controller.ControllerContext
+
+/**
+ * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
+ * session expiration, instead it assumes the caller will handle it by probably try to re-elect again. If the existing
+ * leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change
+ * callback
+ */
+class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit,
+                             brokerId: Int)
+  extends LeaderElector with Logging {
+  var leaderId = -1
+  // create the election path in ZK, if one does not exist
+  val index = electionPath.lastIndexOf("/")
+  if (index > 0)
+    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
+  val leaderChangeListener = new LeaderChangeListener
+
+  def startup {
+    controllerContext.controllerLock synchronized {
+      elect
+    }
+  }
+
+  def amILeader : Boolean = leaderId == brokerId
+
+  def elect: Boolean = {
+    controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
+    try {
+      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
+      info(brokerId + " successfully elected as leader")
+      leaderId = brokerId
+      onBecomingLeader()
+    } catch {
+      case e: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        debug("Someone else was elected as leader other than " + brokerId)
+        val data: String = controllerContext.zkClient.readData(electionPath, true)
+        if (data != null) leaderId = data.toInt
+      case e2 => throw e2
+    }
+    amILeader
+  }
+
+  def close = {
+    leaderId = -1
+  }
+
+  /**
+   * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
+   * have its own session expiration listener and handler
+   */
+  class LeaderChangeListener extends IZkDataListener with Logging {
+    /**
+     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
+     * @throws Exception On any error.
+     */
+    @throws(classOf[Exception])
+    def handleDataChange(dataPath: String, data: Object) {
+    }
+
+    /**
+     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+      controllerContext.controllerLock synchronized {
+        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
+          .format(brokerId, dataPath))
+        elect
+      }
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Tue Sep 18 17:21:15 2012
@@ -79,7 +79,7 @@ object VerifyConsumerRebalance extends L
      * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
      */
     val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group)
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keys.iterator)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
 
     partitionsPerTopicMap.foreach { partitionsForTopic =>
       val topic = partitionsForTopic._1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala Tue Sep 18 17:21:15 2012
@@ -33,7 +33,7 @@ abstract class ShutdownableThread(val na
     isRunning.set(false)
     interrupt()
     shutdownLatch.await()
-    info("Shutted down completed")
+    info("Shutdown completed")
   }
 
     /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Tue Sep 18 17:21:15 2012
@@ -44,7 +44,7 @@ object UpdateOffsetsInZK {
 
   private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
     val cluster = ZkUtils.getCluster(zkClient)
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic))
     var partitions: Seq[Int] = Nil
 
     partitionsPerTopicMap.get(topic) match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Sep 18 17:21:15 2012
@@ -419,7 +419,7 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
   mutable.Map[(String, Int), LeaderAndIsr] = {
     val ret = new mutable.HashMap[(String, Int), LeaderAndIsr]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
@@ -434,7 +434,28 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[(String, Int), Seq[Int]] = {
+    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+    topics.foreach { topic =>
+      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      jsonPartitionMapOpt match {
+        case Some(jsonPartitionMap) =>
+          SyncJSON.parseFull(jsonPartitionMap) match {
+            case Some(m) =>
+              val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+              for((partition, replicas) <- replicaMap){
+                ret.put((topic, partition.toInt), replicas.map(_.toInt))
+                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+              }
+            case None =>
+          }
+        case None =>
+      }
+    }
+    ret
+  }
+
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]):
   mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
     topics.foreach{ topic =>
@@ -466,7 +487,7 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
     getPartitionAssignmentForTopics(zkClient, topics).map
     { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
@@ -477,20 +498,17 @@ object ZkUtils extends Logging {
   }
 
   def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int):
-  Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
-    topicsAndPartitions.map
-    {
-      topicAndPartitionMap =>
-        val topic = topicAndPartitionMap._1
-        val partitionMap = topicAndPartitionMap._2
-        val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
-        for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
-          ret.put((topic, relevantPartition), replicaAssignment)
-        }
-    }
-    ret
+    Seq[(String, Int)] = {
+    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics)
+    topicsAndPartitions.map { topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
+      val relevantPartitions = relevantPartitionsMap.map(_._1)
+      for(relevantPartition <- relevantPartitions) yield {
+        (topic, relevantPartition)
+      }
+    }.flatten[(String, Int)].toSeq
   }
 
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Tue Sep 18 17:21:15 2012
@@ -72,7 +72,7 @@ object RpcDataSerializationTestUtils{
     val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), leaderAndISR1),
                   ((topic2, 0), leaderAndISR2))
-    new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map)
+    new LeaderAndIsrRequest(map)
   }
 
   def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1387270&r1=1387269&r2=1387270&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Sep 18 17:21:15 2012
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
 package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
@@ -111,7 +127,8 @@ class LogRecoveryTest extends JUnit3Suit
     server1.startup()
 
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
-    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+    assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
+      leader.isDefined && (leader.get == 0 || leader.get == 1))
 
     assertEquals(30L, hwFile1.read(topic, 0))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
@@ -120,7 +137,8 @@ class LogRecoveryTest extends JUnit3Suit
 
     server2.startup()
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
-    assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
+    assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
+      leader.isDefined && (leader.get == 0 || leader.get == 1))
 
     sendMessages()
     // give some time for follower 1 to record leader HW of 60
@@ -249,4 +267,4 @@ class LogRecoveryTest extends JUnit3Suit
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
     }
   }
-}
\ No newline at end of file
+}



Mime
View raw message