kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1402395 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/common/ main/scala/kafka/controller/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/admin/ test/sca...
Date Fri, 26 Oct 2012 05:23:55 GMT
Author: jjkoshy
Date: Fri Oct 26 05:23:55 2012
New Revision: 1402395

URL: http://svn.apache.org/viewvc?rev=1402395&view=rev
Log:
Implement clean shutdown in replication; patched by Joel Koshy; reviewed by Jun Rao and Neha Narkhede; KAFKA-340

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ShutdownBroker.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ShutdownBroker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ShutdownBroker.scala?rev=1402395&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ShutdownBroker.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ShutdownBroker.scala Fri Oct 26 05:23:55 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
+import javax.management.ObjectName
+import kafka.controller.KafkaController
+import scala.Some
+
+
+object ShutdownBroker extends Logging {
+
+  private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String)
+
+  private def invokeShutdown(params: ShutdownParams): Boolean = {
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
+      val controllerBrokerId = ZkUtils.getController(zkClient)
+      val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId)
+      controllerOpt match {
+        case Some(controller) =>
+          val jmxUrl = new JMXServiceURL(params.jmxUrl)
+          val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
+          val mbsc = jmxc.getMBeanServerConnection
+          val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName),
+            "shutdownBroker",
+            Array(params.brokerId),
+            Array(classOf[Int].getName)).asInstanceOf[Int]
+          val shutdownComplete = (leaderPartitionsRemaining == 0)
+          info("Shutdown status: " + (if (shutdownComplete)
+                  "complete" else
+                  "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
+          shutdownComplete
+        case None =>
+          error("Operation failed due to controller failure on %d.".format(controllerBrokerId))
+          false
+      }
+    }
+    catch {
+      case t: Throwable =>
+        error("Operation failed due to %s.".format(t.getMessage), t)
+        false
+    }
+    finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to shutdown.")
+            .withRequiredArg
+            .describedAs("Broker Id")
+            .ofType(classOf[java.lang.Integer])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
+    val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown does not complete.")
+            .withRequiredArg
+            .describedAs("number of retries")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries requested.")
+            .withRequiredArg
+            .describedAs("retry interval in ms (> 1000)")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1000)
+    val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.")
+            .withRequiredArg
+            .describedAs("JMX url.")
+            .ofType(classOf[String])
+            .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
+
+    val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
+    val numRetries = options.valueOf(numRetriesOpt).intValue
+
+    val shutdownParams =
+      ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt))
+
+    if (!invokeShutdown(shutdownParams)) {
+      (1 to numRetries).takeWhile(attempt => {
+        info("Retry " + attempt)
+        try {
+          Thread.sleep(retryIntervalMs)
+        }
+        catch {
+          case ie: InterruptedException => // ignore
+        }
+        !invokeShutdown(shutdownParams)
+      })
+    }
+  }
+
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Fri Oct 26 05:23:55 2012
@@ -20,8 +20,11 @@ package kafka.api
 
 import java.nio._
 import kafka.api.ApiUtils._
+import kafka.utils.Logging
+import kafka.network.InvalidRequestException
 
-object StopReplicaRequest {
+
+object StopReplicaRequest extends Logging {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
   val DefaultAckTimeout = 100
@@ -30,28 +33,38 @@ object StopReplicaRequest {
     val versionId = buffer.getShort
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val deletePartitions = buffer.get match {
+      case 1 => true
+      case 0 => false
+      case x =>
+        throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
+    }
     val topicPartitionPairCount = buffer.getInt
     val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
-    for (i <- 0 until topicPartitionPairCount)
+    (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
-    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
+    }
+    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
+                              deletePartitions: Boolean,
                               partitions: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
-  def this(partitions: Set[(String, Int)]) = {
+
+  def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-        partitions)
+         deletePartitions, partitions)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
       writeShortString(buffer, topic)
@@ -60,9 +73,15 @@ case class StopReplicaRequest(versionId:
   }
 
   def sizeInBytes(): Int = {
-    var size = 2 + (2 + clientId.length()) + 4 + 4
+    var size =
+      2 + /* versionId */
+      ApiUtils.shortStringLength(clientId) +
+      4 + /* ackTimeoutMs */
+      1 + /* deletePartitions */
+      4 /* partition count */
     for ((topic, partitionId) <- partitions){
-      size += (2 + topic.length()) + 4
+      size += (ApiUtils.shortStringLength(topic)) +
+              4 /* partition id */
     }
     size
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala Fri Oct 26 05:23:55 2012
@@ -25,5 +25,7 @@ case class TopicAndPartition(topic: Stri
   def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
 
   def asTuple = (topic, partition)
+
+  override def toString = "[%s,%d]".format(topic, partition)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala Fri Oct 26 05:23:55 2012
@@ -49,7 +49,13 @@ class ControllerChannelManager private (
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
     brokerLock synchronized {
-      brokerStateInfo(brokerId).messageQueue.put((request, callback))
+      val stateInfoOpt = brokerStateInfo.get(brokerId)
+      stateInfoOpt match {
+        case Some(stateInfo) =>
+          stateInfo.messageQueue.put((request, callback))
+        case None =>
+          warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
+      }
     }
   }
 
@@ -123,7 +129,7 @@ class RequestSendThread(val controllerId
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
-        trace("got a response %s".format(controllerId, response, toBrokerId))
+        trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response))
 
         if(callback != null){
           callback(response)
@@ -141,6 +147,7 @@ class ControllerBrokerRequestBatch(sendR
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -149,6 +156,7 @@ class ControllerBrokerRequestBatch(sendR
         "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
+    stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
@@ -160,10 +168,18 @@ class ControllerBrokerRequestBatch(sendR
     }
   }
 
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) {
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) {
     brokerIds.foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
-      stopReplicaRequestMap(brokerId) :+ (topic, partition)
+      stopAndDeleteReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
+      if (deletePartition) {
+        val v = stopAndDeleteReplicaRequestMap(brokerId)
+        stopAndDeleteReplicaRequestMap(brokerId) = v :+ (topic, partition)
+      }
+      else {
+        val v = stopReplicaRequestMap(brokerId)
+        stopReplicaRequestMap(brokerId) = v :+ (topic, partition)
+      }
     }
   }
 
@@ -176,12 +192,19 @@ class ControllerBrokerRequestBatch(sendR
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.foreach { r =>
-      val broker = r._1
-      debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(",")))
-      sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null)
+    Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
+      case(m, deletePartitions) => {
+        m foreach {
+          case(broker, replicas) =>
+            if (replicas.size > 0) {
+              debug("The stop replica request (delete = %s) sent to broker %d is %s"
+                .format(deletePartitions, broker, replicas.mkString(",")))
+              sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null)
+            }
+        }
+        m.clear()
+      }
     }
-    stopReplicaRequestMap.clear()
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala Fri Oct 26 05:23:55 2012
@@ -31,21 +31,46 @@ import kafka.utils.{Utils, ZkUtils, Logg
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import java.lang.{IllegalStateException, Object}
 import kafka.admin.PreferredReplicaLeaderElectionCommand
-import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
-                        var liveBrokers: Set[Broker] = null,
-                        var liveBrokerIds: Set[Int] = null,
-                        var allTopics: Set[String] = null,
-                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = null,
-                        var allLeaders: mutable.Map[TopicAndPartition, Int] = null,
+                        var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
+                        val brokerShutdownLock: Object = new Object,
+                        var allTopics: Set[String] = Set.empty,
+                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
+                        var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
-                        new mutable.HashMap,
-                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet)
+                          new mutable.HashMap,
+                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
+                          new mutable.HashSet) {
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
+  private var liveBrokersUnderlying: Set[Broker] = Set.empty
+  private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
+
+  // setter
+  def liveBrokers_=(brokers: Set[Broker]) {
+    liveBrokersUnderlying = brokers
+    liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
+  }
+
+  // getter
+  def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
+  def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
+
+  def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying ++ shuttingDownBrokerIds
+}
+
+trait KafkaControllerMBean {
+  def shutdownBroker(id: Int): Int
+}
+
+object KafkaController {
+  val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
+}
+
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   val controllerContext = new ControllerContext(zkClient)
@@ -55,6 +80,8 @@ class KafkaController(val config : Kafka
     config.brokerId)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
+  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
 
   newGauge(
     "ActiveControllerCount",
@@ -64,6 +91,101 @@ class KafkaController(val config : Kafka
   )
 
   /**
+   * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
+   * the controller first determines the partitions that the shutting down
+   * broker leads, and moves leadership of those partitions to another broker
+   * that is in that partition's ISR. When all partitions have been moved, the
+   * broker process can be stopped normally (i.e., by sending it a SIGTERM or
+   * SIGINT) and no data loss should be observed.
+   *
+   * @param id Id of the broker to shutdown.
+   * @return The number of partitions that the broker still leads.
+   */
+  def shutdownBroker(id: Int) = {
+
+    controllerContext.brokerShutdownLock synchronized {
+      info("Shutting down broker " + id)
+
+      controllerContext.controllerLock synchronized {
+        if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+          throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+
+        controllerContext.shuttingDownBrokerIds.add(id)
+
+        debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+        debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+      }
+
+      val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized {
+        getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map {
+          case(topic, partition) =>
+            val topicAndPartition = TopicAndPartition(topic, partition)
+            (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)
+        }
+      }
+
+      def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
+        trace("All leaders = " + controllerContext.allLeaders.mkString(","))
+        controllerContext.allLeaders.filter {
+          case (topicAndPartition, leader) =>
+            leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.map(_._1)
+      }
+
+      val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
+      debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
+
+      partitionsToMove.foreach(topicAndPartition => {
+        val (topic, partition) = topicAndPartition.asTuple
+        // move leadership serially to relinquish lock.
+        controllerContext.controllerLock synchronized {
+          controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => {
+            if (currLeader == id) {
+              partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                controlledShutdownPartitionLeaderSelector)
+              val newLeader = controllerContext.allLeaders(topicAndPartition)
+
+              // mark replica offline only if leadership was moved successfully
+              if (newLeader != currLeader)
+                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
+            }
+            else
+              debug("Partition %s moved from leader %d to new leader %d during shutdown."
+                .format(topicAndPartition, id, currLeader))
+          })
+        }
+      })
+
+      /*
+      * Force the shutting down broker out of the ISR of partitions that it
+      * follows, and shutdown the corresponding replica fetcher threads.
+      * This is really an optimization, so no need to register any callback
+      * to wait until completion.
+      */
+      brokerRequestBatch.newBatch()
+      allPartitionsAndReplicationFactorOnBroker foreach {
+        case(topicAndPartition, replicationFactor) =>
+          val (topic, partition) = topicAndPartition.asTuple
+          if (controllerContext.allLeaders(topicAndPartition) != id) {
+            brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+            removeReplicaFromIsr(topic, partition, id) match {
+              case Some(updatedLeaderAndIsr) =>
+                brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+                  Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor)
+              case None =>
+              // ignore
+            }
+          }
+      }
+      brokerRequestBatch.sendRequestsToBrokers()
+
+      val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
+      debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
+      partitionsRemaining.size
+    }
+  }
+
+  /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
    * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
@@ -83,8 +205,10 @@ class KafkaController(val config : Kafka
       initializeControllerContext()
       partitionStateMachine.startup()
       replicaStateMachine.startup()
+      Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller".format(config.brokerId))
-    }else
+    }
+    else
       info("Controller has been shut down, aborting startup/failover")
   }
 
@@ -105,6 +229,7 @@ class KafkaController(val config : Kafka
    */
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+
     // update leader and isr cache for broker
     updateLeaderAndIsrCache()
     // update partition state machine
@@ -128,6 +253,11 @@ class KafkaController(val config : Kafka
    */
   def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+
+    val deadBrokersThatWereShuttingDown =
+      deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
+    info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
+
     // update leader and isr cache for broker
     updateLeaderAndIsrCache()
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
@@ -260,7 +390,6 @@ class KafkaController(val config : Kafka
 
   private def initializeControllerContext() {
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-    controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
       controllerContext.allTopics.toSeq)
@@ -270,6 +399,7 @@ class KafkaController(val config : Kafka
     // start the channel manager
     startChannelManager()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
+    info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
     initializeAndMaybeTriggerPartitionReassignment()
     initializeAndMaybeTriggerPreferredReplicaElection()
@@ -341,7 +471,8 @@ class KafkaController(val config : Kafka
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
       partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
-    }else {
+    }
+    else {
       // check if the leader is alive or not
       controllerContext.liveBrokerIds.contains(currentLeader) match {
         case true =>
@@ -440,7 +571,8 @@ class KafkaController(val config : Kafka
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if(currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
-      }else {
+      }
+      else {
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
       }
     }
@@ -455,6 +587,55 @@ class KafkaController(val config : Kafka
     }.flatten
   }
 
+  /**
+   * Removes a given partition replica from the ISR; if it is not the current
+   * leader and there are sufficient remaining replicas in ISR.
+   * @param topic topic
+   * @param partition partition
+   * @param replicaId replica Id
+   * @return the new leaderAndIsr (with the replica removed if it was present),
+   *         or None if leaderAndIsr is empty.
+   */
+  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
+    var finalLeaderAndIsr: Option[LeaderAndIsr] = None
+    var zkWriteCompleteOrUnnecessary = false
+    while (!zkWriteCompleteOrUnnecessary) {
+      // refresh leader and isr from zookeeper again
+      val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
+        case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
+          if (leaderAndIsr.isr.contains(replicaId)) {
+            val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+                                               leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
+            // update the new leadership decision in zookeeper or retry
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
+              zkClient,
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
+              leaderAndIsr.zkVersion)
+            newLeaderAndIsr.zkVersion = newVersion
+
+            finalLeaderAndIsr = Some(newLeaderAndIsr)
+            if (updateSucceeded)
+              info("New leader and ISR for partition [%s, %d] is %s"
+                   .format(topic, partition, newLeaderAndIsr.toString()))
+            updateSucceeded
+          }
+          else {
+            warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
+                 .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
+            finalLeaderAndIsr = Some(leaderAndIsr)
+            true
+          }
+        case None =>
+          warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition))
+          true
+      }
+    }
+    finalLeaderAndIsr
+  }
+
   class SessionExpirationListener() extends IZkStateListener with Logging {
     this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
     @throws(classOf[Exception])
@@ -472,6 +653,7 @@ class KafkaController(val config : Kafka
     @throws(classOf[Exception])
     def handleNewSession() {
       controllerContext.controllerLock synchronized {
+        Utils.unregisterMBean(KafkaController.MBeanName)
         partitionStateMachine.shutdown()
         replicaStateMachine.shutdown()
         if(controllerContext.controllerChannelManager != null) {
@@ -523,14 +705,16 @@ class PartitionsReassignedListener(contr
                 throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
                   .format(topicAndPartition) +
                   " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
-              }else {
+              }
+              else {
                 if(aliveNewReplicas == newReplicas) {
                   info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
                     newReplicas.mkString(",")))
                   val context = createReassignmentContextForPartition(topic, partition, newReplicas)
                   controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
                   controller.onPartitionReassignment(topicAndPartition, context)
-                }else {
+                }
+                else {
                   // some replica in RAR is not alive. Fail partition reassignment
                   throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
                     " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
@@ -609,7 +793,8 @@ class ReassignedPartitionsIsrChangeListe
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
                     "Resuming partition reassignment")
                   controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-                }else {
+                }
+                else {
                   info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
                     "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala Fri Oct 26 05:23:55 2012
@@ -25,10 +25,9 @@ trait PartitionLeaderSelector {
   /**
    * @param topic                      The topic of the partition whose leader needs to be elected
    * @param partition                  The partition whose leader needs to be elected
-   * @param assignedReplicas           The list of replicas assigned to the input partition
    * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
    * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
-   * @returns The leader and isr request, with the newly selected leader info, to send to the brokers
+   * @return The leader and isr request, with the newly selected leader info, to send to the brokers
    * Also, returns the list of replicas the returned leader and isr request should be sent to
    * This API selects a new leader for the input partition
    */
@@ -146,4 +145,41 @@ with Logging {
       }
     }
   }
-}
\ No newline at end of file
+}
+
+/**
+ * Picks one of the alive replicas (other than the current leader) in ISR as
+ * new leader, fails if there are no other replicas in ISR.
+ */
+class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
+        extends PartitionLeaderSelector
+        with Logging {
+
+  this.logIdent = "[ControlledShutdownLeaderSelector]: "
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+
+    val currentLeader = currentLeaderAndIsr.leader
+
+    val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
+    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
+
+    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader &&
+                                                            !controllerContext.shuttingDownBrokerIds.contains(brokerId))
+    val newLeaderOpt = newIsr.headOption
+    newLeaderOpt match {
+      case Some(newLeader) =>
+        debug("Partition [%s,%d] : current leader = %d, new leader = %d"
+              .format(topic, partition, currentLeader, newLeader))
+        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
+         liveAssignedReplicas)
+      case None =>
+        throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+    }
+  }
+
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala Fri Oct 26 05:23:55 2012
@@ -121,8 +121,8 @@ class PartitionStateMachine(controller: 
   private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
                                 leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
+    val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
     try {
-      partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
@@ -163,8 +163,8 @@ class PartitionStateMachine(controller: 
           // post: partition state is deleted from all brokers and zookeeper
       }
     }catch {
-      case e => error("State change for partition [%s, %d] ".format(topic, partition) +
-        "from %s to %s failed".format(partitionState(topicAndPartition), targetState), e)
+      case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
+        "from %s to %s failed".format(currState, targetState), t)
     }
   }
 
@@ -203,8 +203,8 @@ class PartitionStateMachine(controller: 
   /**
    * Invoked on the NonExistentPartition->NewPartition state transition to update the controller's cache with the
    * partition's replica assignment.
-   * @topic     The topic of the partition whose replica assignment is to be cached
-   * @partition The partition whose replica assignment is to be cached
+   * @param topic     The topic of the partition whose replica assignment is to be cached
+   * @param partition The partition whose replica assignment is to be cached
    */
   private def assignReplicasToPartitions(topic: String, partition: Int) {
     val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
@@ -216,10 +216,7 @@ class PartitionStateMachine(controller: 
    * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it's leader and isr
    * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
    * OfflinePartition state.
-   * @topic               The topic of the partition whose leader and isr path is to be initialized
-   * @partition           The partition whose leader and isr path is to be initialized
-   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
-   *                      this state change
+   * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
     debug("Initializing leader and isr for partition %s".format(topicAndPartition))
@@ -258,10 +255,9 @@ class PartitionStateMachine(controller: 
   /**
    * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
    * for the input offline partition
-   * @topic               The topic of the offline partition
-   * @partition           The offline partition
-   * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
-   *                      this state change
+   * @param topic               The topic of the offline partition
+   * @param partition           The offline partition
+   * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     /** handle leader election for the partitions whose leader is no longer alive **/
@@ -291,8 +287,8 @@ class PartitionStateMachine(controller: 
     }catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)
-      case sce => throw new StateChangeFailedException(("Error while electing leader for partition" +
-        " [%s, %d]").format(topic, partition), sce)
+      case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
+        " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Fri Oct 26 05:23:55 2012
@@ -19,7 +19,6 @@ package kafka.controller
 import collection._
 import kafka.utils.{ZkUtils, Logging}
 import collection.JavaConversions._
-import kafka.api.LeaderAndIsr
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.IZkChildListener
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
@@ -75,7 +74,7 @@ class ReplicaStateMachine(controller: Ka
 
   /**
    * This API is invoked by the broker change controller callbacks and the startup API of the state machine
-   * @param brokerIds    The list of brokers that need to be transitioned to the target state
+   * @param replicas     The list of replicas (brokers) that need to be transitioned to the target state
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
@@ -122,7 +121,7 @@ class ReplicaStateMachine(controller: Ka
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition)
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition,
@@ -159,38 +158,38 @@ class ReplicaStateMachine(controller: Ka
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
-          var zookeeperPathUpdateSucceeded: Boolean = false
-          var newLeaderAndIsr: LeaderAndIsr = null
-          while(!zookeeperPathUpdateSucceeded) {
-            // refresh leader and isr from zookeeper again
-            val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-            leaderAndIsrOpt match {
-              case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
-                newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
-                  leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
-                info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
-                // update the new leadership decision in zookeeper or retry
-                val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
-                  leaderAndIsr.zkVersion)
-                newLeaderAndIsr.zkVersion = newVersion
-                zookeeperPathUpdateSucceeded = updateSucceeded
-              case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) +
-                " for partition [%s, %d] since the leader and isr path in zookeeper is empty".format(topic, partition))
-            }
+          val currLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+          val leaderAndIsrIsEmpty: Boolean = currLeaderAndIsrOpt match {
+            case Some(currLeaderAndIsr) =>
+              if (currLeaderAndIsr.isr.contains(replicaId))
+                controller.removeReplicaFromIsr(topic, partition, replicaId) match {
+                  case Some(updatedLeaderAndIsr) =>
+                    // send the shrunk ISR state change request only to the leader
+                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
+                                                                        topic, partition, updatedLeaderAndIsr,
+                                                                        replicaAssignment.size)
+                    replicaState.put((topic, partition, replicaId), OfflineReplica)
+                    info("Replica %d for partition [%s, %d] state changed to OfflineReplica"
+                                 .format(replicaId, topic, partition))
+                    info("Removed offline replica %d from ISR for partition [%s, %d]"
+                                 .format(replicaId, topic, partition))
+                    false
+                  case None =>
+                    true
+                }
+              else false
+            case None =>
+              true
           }
-          // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
-                                                              topic, partition, newLeaderAndIsr, replicaAssignment.size)
-          // update the local leader and isr cache
-          controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr.leader)
-          replicaState.put((topic, partition, replicaId), OfflineReplica)
-          info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
-          info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
+          if (leaderAndIsrIsEmpty)
+            throw new StateChangeFailedException(
+              "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty"
+              .format(replicaId, topic, partition))
       }
-    }catch {
-      case e => error("Error while changing state of replica %d for partition ".format(replicaId) +
-        "[%s, %d] to %s".format(topic, partition, targetState), e)
+    }
+    catch {
+      case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) +
+        "[%s, %d] to %s".format(topic, partition, targetState), t)
     }
   }
 
@@ -239,12 +238,11 @@ class ReplicaStateMachine(controller: Ka
           controllerContext.controllerLock synchronized {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
-              val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds
+              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
               val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-              val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds
+              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
               controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-              controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id)
-              info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+              info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
               newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
               deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Fri Oct 26 05:23:55 2012
@@ -25,7 +25,7 @@ abstract class AbstractFetcherManager(pr
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
   private val mapLock = new Object
-  this.logIdent = "[" + name + "], "
+  this.logIdent = "[" + name + "] "
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Oct 26 05:23:55 2012
@@ -88,11 +88,11 @@ class KafkaApis(val requestChannel: Requ
     trace("Handling stop replica request " + stopReplicaRequest)
 
     val responseMap = new HashMap[(String, Int), Short]
-
     for((topic, partitionId) <- stopReplicaRequest.partitions) {
-      val errorCode = replicaManager.stopReplica(topic, partitionId)
+      val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
       responseMap.put((topic, partitionId), errorCode)
     }
+
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
   }
@@ -455,11 +455,20 @@ class KafkaApis(val requestChannel: Requ
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch)
-      val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
-      val fromFollower = delayed.fetch.isFromFollower
-      delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
-      requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
+      debug("Expiring fetch request %s.".format(delayed.fetch))
+      try {
+        val topicData = readMessageSets(delayed.fetch)
+        val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+        val fromFollower = delayed.fetch.isFromFollower
+        delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
+        requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
+      }
+      catch {
+        case e1: LeaderNotAvailableException =>
+          debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
+        case e2: UnknownTopicOrPartitionException =>
+          debug("Replica went offline before fetch request %s expired.".format(delayed.fetch))
+      }
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Fri Oct 26 05:23:55 2012
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Oct 26 05:23:55 2012
@@ -91,14 +91,15 @@ class ReplicaManager(val config: KafkaCo
     kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
-  def stopReplica(topic: String, partitionId: Int): Short  = {
+  def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
     trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
         replicaFetcherManager.removeFetcher(topic, partitionId)
         /* TODO: handle deleteLog in a better way */
-        //logManager.deleteLog(topic, partition)
+        //if (deletePartition)
+        //  logManager.deleteLog(topic, partition)
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Fri Oct 26 05:23:55 2012
@@ -650,7 +650,7 @@ object ZkUtils extends Logging {
    * or throws an exception if the broker dies before the query to zookeeper finishes
    * @param brokerId The broker id
    * @param zkClient The zookeeper client connection
-   * @returns An optional Broker object encapsulating the broker metadata
+   * @return An optional Broker object encapsulating the broker metadata
    */
   def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = {
     ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Fri Oct 26 05:23:55 2012
@@ -22,7 +22,8 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -357,6 +358,53 @@ class AdminTest extends JUnit3Suite with
     servers.foreach(_.shutdown())
   }
 
+  @Test
+  def testShutdownBroker() {
+    val expectedReplicaAssignment = Map(1  -> List("0", "1", "2"))
+    val topic = "test"
+    val partition = 1
+    // create brokers
+    val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
+
+    // broker 2 should be the leader since it was started first
+    var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    var partitionsRemaining = controller.shutdownBroker(2)
+    assertEquals(0, partitionsRemaining)
+    var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+    assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+
+    leaderBeforeShutdown = leaderAfterShutdown
+    controllerId = ZkUtils.getController(zkClient)
+    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    partitionsRemaining = controller.shutdownBroker(1)
+    assertEquals(0, partitionsRemaining)
+    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+
+    leaderBeforeShutdown = leaderAfterShutdown
+    controllerId = ZkUtils.getController(zkClient)
+    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    partitionsRemaining = controller.shutdownBroker(0)
+    assertEquals(1, partitionsRemaining)
+    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+    assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+
+    servers.foreach(_.shutdown())
+
+
+  }
+
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1402395&r1=1402394&r2=1402395&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Fri Oct 26 05:23:55 2012
@@ -97,7 +97,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {



Mime
View raw message