kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1396726 [1/2] - in /incubator/kafka/branches/0.8: bin/ config/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/sr...
Date Wed, 10 Oct 2012 18:42:59 GMT
Author: nehanarkhede
Date: Wed Oct 10 18:42:57 2012
New Revision: 1396726

URL: http://svn.apache.org/viewvc?rev=1396726&view=rev
Log:
Reverting KAFKA-42 since it accidentally contained changes to metrics package. Didn't catch it due to a stale sbt cache. It is better to redo the patch and then commit

Removed:
    incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh
    incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
Modified:
    incubator/kafka/branches/0.8/config/server.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/project/build/KafkaProject.scala
    incubator/kafka/branches/0.8/system_test/testcase_to_run.json

Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Wed Oct 10 18:42:57 2012
@@ -113,3 +113,10 @@ zk.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
+
+# metrics reporter properties
+# kafka.metrics.polling.interval.secs=5
+# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+# kafka.csv.metrics.dir=kafka_metrics
+# kafka.csv.metrics.reporter.enabled=true
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Wed Oct 10 18:42:57 2012
@@ -51,7 +51,7 @@ object AdminUtils extends Logging {
    */
   def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
                               fixedStartIndex: Int = -1)  // for testing only
-  : Map[Int, Seq[String]] = {
+  : Map[Int, List[String]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
@@ -75,7 +75,7 @@ object AdminUtils extends Logging {
     ret.toMap
   }
 
-  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[String]], zkClient: ZkClient) {
+  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
       val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Wed Oct 10 18:42:57 2012
@@ -23,6 +23,7 @@ import kafka.utils._
 import collection.mutable.Map
 import collection.mutable.HashMap
 
+
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Wed Oct 10 18:42:57 2012
@@ -20,6 +20,8 @@ package kafka.api
 
 import java.nio._
 import kafka.utils._
+import collection.mutable.HashSet
+import collection.mutable.Set
 
 object StopReplicaRequest {
   val CurrentVersion = 1.shortValue()
@@ -31,30 +33,29 @@ object StopReplicaRequest {
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val topicPartitionPairCount = buffer.getInt
-    val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
-    for (i <- 0 until topicPartitionPairCount) {
-      topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt)
+    val topicPartitionPairSet = new HashSet[(String, Int)]()
+    for (i <- 0 until topicPartitionPairCount){
+      topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
     }
-    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
+    new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
-                              partitions: Set[(String, Int)])
+                              stopReplicaSet: Set[(String, Int)])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
-  def this(partitions: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
-        partitions)
+  def this(stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
-    buffer.putInt(partitions.size)
-    for ((topic, partitionId) <- partitions){
+    buffer.putInt(stopReplicaSet.size)
+    for ((topic, partitionId) <- stopReplicaSet){
       Utils.writeShortString(buffer, topic, "UTF-8")
       buffer.putInt(partitionId)
     }
@@ -62,7 +63,7 @@ case class StopReplicaRequest(versionId:
 
   def sizeInBytes(): Int = {
     var size = 2 + (2 + clientId.length()) + 4 + 4
-    for ((topic, partitionId) <- partitions){
+    for ((topic, partitionId) <- stopReplicaSet){
       size += (2 + topic.length()) + 4
     }
     size

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Oct 10 18:42:57 2012
@@ -50,7 +50,7 @@ class Partition(val topic: String,
   newGauge(
     topic + "-" + partitionId + "UnderReplicated",
     new Gauge[Int] {
-      def value() = {
+      def getValue = {
         if (isUnderReplicated) 1 else 0
       }
     }
@@ -69,7 +69,7 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val log = logManager.getOrCreateLog(topic, partitionId)
           val localReplica = new Replica(replicaId, this, time,
-            highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
+            highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log))
           addReplicaIfNotExists(localReplica)
         }
         else {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala Wed Oct 10 18:42:57 2012
@@ -17,12 +17,10 @@
 
 package kafka.common
 
-
 /**
  * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
  * all the replicas for a partition are offline
  */
-class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
-  def this(message: String) = this(message, null)
-  def this() = this(null, null)
+class PartitionOfflineException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/StateChangeFailedException.scala Wed Oct 10 18:42:57 2012
@@ -17,7 +17,7 @@
 
 package kafka.common
 
-class StateChangeFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
-  def this(message: String) = this(message, null)
-  def this() = this(null, null)
+class StateChangeFailedException(message: String) extends RuntimeException(message) {
+  def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString)
+  def this() = this(null)
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Oct 10 18:42:57 2012
@@ -685,7 +685,7 @@ private[kafka] class ZookeeperConsumerCo
       newGauge(
         config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
-          def value() = q.size
+          def getValue = q.size
         }
       )
     })

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala Wed Oct 10 18:42:57 2012
@@ -137,49 +137,36 @@ class RequestSendThread(val controllerId
   }
 }
 
+// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr
+// request
 class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
   extends  Logging {
-  val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
-  val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
+    if(brokerRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
-        "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
-    leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.clear()
+        "a new one. Some state changes %s might be lost ".format(brokerRequestMap.toString()))
+    brokerRequestMap.clear()
   }
 
-  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
+  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
     brokerIds.foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr)
-    }
-  }
-
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) {
-    brokerIds.foreach { brokerId =>
-      stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
-      stopReplicaRequestMap(brokerId) :+ (topic, partition)
+      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
+      brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr)
     }
   }
 
   def sendRequestsToBrokers() {
-    leaderAndIsrRequestMap.foreach { m =>
+    brokerRequestMap.foreach { m =>
       val broker = m._1
       val leaderAndIsr = m._2
       val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
-      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
+      info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
-    leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.foreach { r =>
-      val broker = r._1
-      debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(",")))
-      sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null)
-    }
-    stopReplicaRequestMap.clear()
+    brokerRequestMap.clear()
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala Wed Oct 10 18:42:57 2012
@@ -20,17 +20,14 @@ import collection._
 import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
-import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.utils.{ZkUtils, Logging}
+import java.lang.Object
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import com.yammer.metrics.core.Gauge
-import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
-import kafka.utils.{Utils, ZkUtils, Logging}
-import org.I0Itec.zkclient.exception.ZkNoNodeException
-import java.lang.{IllegalStateException, Object}
-import kafka.common.KafkaException
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -39,24 +36,21 @@ class ControllerContext(val zkClient: Zk
                         var liveBrokerIds: Set[Int] = null,
                         var allTopics: Set[String] = null,
                         var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null,
-                        var allLeaders: mutable.Map[(String, Int), Int] = null,
-                        var partitionsBeingReassigned: mutable.Map[(String, Int), ReassignedPartitionsContext] =
-                        new mutable.HashMap)
+                        var allLeaders: mutable.Map[(String, Int), Int] = null)
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Controller " + config.brokerId + "]: "
+  this.logIdent = "[Controller " + config.brokerId + "], "
   private var isRunning = true
   val controllerContext = new ControllerContext(zkClient)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     config.brokerId)
-  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
 
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
-      def value() = if (isActive) 1 else 0
+      def getValue = if (isActive) 1 else 0
     }
   )
 
@@ -73,7 +67,6 @@ class KafkaController(val config : Kafka
     if(isRunning) {
       info("Broker %d starting become controller state transition".format(config.brokerId))
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
-      registerReassignedPartitionsListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
@@ -105,12 +98,7 @@ class KafkaController(val config : Kafka
     updateLeaderAndIsrCache()
     // update partition state machine
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers),
-      OnlineReplica)
-    // check if reassignment of some partitions need to be restarted
-    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p =>
-      p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a))
-    partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1._1, p._1._2, p._2))
+    replicaStateMachine.handleStateChanges(newBrokers, OnlineReplica)
   }
 
   /**
@@ -133,8 +121,7 @@ class KafkaController(val config : Kafka
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers),
-      OfflineReplica)
+    replicaStateMachine.handleStateChanges(deadBrokers, OfflineReplica)
   }
 
   /**
@@ -159,73 +146,28 @@ class KafkaController(val config : Kafka
   def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
-    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
-    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
-  }
-
-  /**
-   * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
-   * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
-   * Reassigning replicas for a partition goes through a few stages -
-   * RAR = Reassigned replicas
-   * AR = Original list of replicas for partition
-   * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR
-   * 2. Start new replicas RAR - AR.
-   * 3. Wait until new replicas are in sync with the leader
-   * 4. If the leader is not in RAR, elect a new leader from RAR
-   * 5. Stop old replicas AR - RAR
-   * 6. Write new AR
-   * 7. Remove partition from the /admin/reassign_partitions path
-   */
-  def onPartitionReassignment(topic: String, partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    areReplicasInIsr(topic, partition, reassignedReplicas) match {
-      case true =>
-        // mark the new replicas as online
-        reassignedReplicas.foreach { replica =>
-          replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)),
-            OnlineReplica)
-        }
-        // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
-        moveReassignedPartitionLeaderIfRequired(topic, partition, reassignedPartitionContext)
-        // stop older replicas
-        stopOldReplicasOfReassignedPartition(topic, partition, reassignedPartitionContext)
-        // write the new list of replicas for this partition in zookeeper
-        updateAssignedReplicasForPartition(topic, partition, reassignedPartitionContext)
-        // update the /admin/reassign_partitions path to remove this partition
-        removePartitionFromReassignedPartitions(topic, partition)
-        info("Removed partition [%s, %d] from the list of reassigned partitions in zookeeper".format(topic, partition))
-        controllerContext.partitionsBeingReassigned.remove((topic, partition))
-      case false =>
-        info("New replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
-          "reassigned not yet caught up with the leader")
-        // start new replicas
-        startNewReplicasForReassignedPartition(topic, partition, reassignedPartitionContext)
-        info("Waiting for new replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
-          "reassigned to catch up with the leader")
-    }
   }
 
   /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
   remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
-  //  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-  //    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-  //    for((topicPartition, brokers) <- replicaAssignment){
-  //      for (broker <- brokers){
-  //        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-  //          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-  //        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
-  //      }
-  //      controllerContext.allLeaders.remove(topicPartition)
-  //      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
-  //    }
-  //    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-  //      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-  //      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-  //      sendRequest(broker, stopReplicaRequest)
-  //    }
-  //  }
+  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+    for((topicPartition, brokers) <- replicaAssignment){
+      for (broker <- brokers){
+        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+      }
+      controllerContext.allLeaders.remove(topicPartition)
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
+    }
+    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+      sendRequest(broker, stopReplicaRequest)
+    }
+  }
 
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
@@ -265,7 +207,7 @@ class KafkaController(val config : Kafka
   }
 
   private def registerSessionExpirationListener() = {
-    zkClient.subscribeStateChanges(new SessionExpirationListener())
+    zkClient.subscribeStateChanges(new SessionExpireListener())
   }
 
   private def initializeControllerContext() {
@@ -281,23 +223,6 @@ class KafkaController(val config : Kafka
     startChannelManager()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
-    initializeReassignedPartitionsContext()
-  }
-
-  private def initializeReassignedPartitionsContext() {
-    // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
-    // check if they are already completed
-    val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
-      controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
-    reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p._1, p._2))
-    controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned
-    controllerContext.partitionsBeingReassigned --= reassignedPartitions
-    info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
-    info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
-    info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString()))
-    controllerContext.partitionsBeingReassigned.foreach(partition =>
-      onPartitionReassignment(partition._1._1, partition._1._2, partition._2))
   }
 
   private def startChannelManager() {
@@ -319,122 +244,8 @@ class KafkaController(val config : Kafka
     }
   }
 
-  private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
-    getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-      case Some(leaderAndIsr) =>
-        val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
-        replicasNotInIsr.isEmpty
-      case None => false
-    }
-  }
-
-  private def moveReassignedPartitionLeaderIfRequired(topic: String, partition: Int,
-                                                      reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.allLeaders((topic, partition))
-    if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
-      info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
-        "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
-      // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
-        reassignedPartitionLeaderSelector)
-    }else {
-      // check if the leader is alive or not
-      controllerContext.liveBrokerIds.contains(currentLeader) match {
-        case true =>
-          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
-            "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
-        case false =>
-          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
-            "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-          partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
-            reassignedPartitionLeaderSelector)
-      }
-    }
-  }
-
-  private def stopOldReplicasOfReassignedPartition(topic: String, partition: Int,
-                                                   reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    // send stop replica state change request to the old replicas
-    val oldReplicas = controllerContext.partitionReplicaAssignment((topic, partition)).toSet -- reassignedReplicas.toSet
-    // first move the replica to offline state (the controller removes it from the ISR)
-    oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
-    }
-    // send stop replica command to the old replicas
-    oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
-    }
-  }
-
-  private def updateAssignedReplicasForPartition(topic: String, partition: Int,
-                                                 reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1._1.equals(topic))
-    partitionsAndReplicasForThisTopic.put((topic, partition), reassignedReplicas)
-    updateAssignedReplicasForPartition(topic, partition, partitionsAndReplicasForThisTopic)
-    info("Updated assigned replicas for partition [%s, %d] being reassigned ".format(topic, partition) +
-      "to %s".format(reassignedReplicas.mkString(",")))
-    // update the assigned replica list after a successful zookeeper write
-    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
-    // stop watching the ISR changes for this partition
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-      controllerContext.partitionsBeingReassigned((topic, partition)).isrChangeListener)
-    // update the assigned replica list
-    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
-  }
-
-  private def startNewReplicasForReassignedPartition(topic: String, partition: Int,
-                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
-    // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
-    // replicas list
-    val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment((topic, partition))
-    val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas
-    val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq
-    newReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NewReplica)
-    }
-  }
-
-  private def registerReassignedPartitionsListener() = {
-    zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
-  }
-
-  def removePartitionFromReassignedPartitions(topic: String, partition: Int) {
-    // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
-    // remove this partition from that list
-    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - ((topic, partition))
-    // write the new list to zookeeper
-    ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
-    // update the cache
-    controllerContext.partitionsBeingReassigned.remove((topic, partition))
-  }
-
-  def updateAssignedReplicasForPartition(topic: String, partition: Int,
-                                         newReplicaAssignmentForTopic: Map[(String, Int), Seq[Int]]) {
-    try {
-      val zkPath = ZkUtils.getTopicPath(topic)
-      val jsonPartitionMap = Utils.mapToJson(newReplicaAssignmentForTopic.map(e =>
-        (e._1._2.toString -> e._2.map(_.toString))))
-      ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
-    } catch {
-      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topic))
-      case e2 => throw new KafkaException(e2.toString)
-    }
-  }
-
-  private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[PartitionAndReplica] = {
-    partitions.map { p =>
-      val replicas = controllerContext.partitionReplicaAssignment(p)
-      replicas.map(r => new PartitionAndReplica(p._1, p._2, r))
-    }.flatten
-  }
-
-  class SessionExpirationListener() extends IZkStateListener with Logging {
-    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
+  class SessionExpireListener() extends IZkStateListener with Logging {
+    this.logIdent = "[Controller " + config.brokerId + "], "
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -463,159 +274,6 @@ class KafkaController(val config : Kafka
   }
 }
 
-/**
- * Starts the partition reassignment process unless -
- * 1. Partition previously existed
- * 2. New replicas are the same as existing replicas
- * 3. Any replica in the new set of replicas are dead
- * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
- * partitions.
- */
-class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
-  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
-  val zkClient = controller.controllerContext.zkClient
-  val controllerContext = controller.controllerContext
-
-  /**
-   * Invoked when some partitions are reassigned by the admin command
-   * @throws Exception On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
-    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
-      .format(dataPath, data))
-    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
-    val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-    newPartitions.foreach { partitionToBeReassigned =>
-      controllerContext.controllerLock synchronized {
-        val topic = partitionToBeReassigned._1._1
-        val partition = partitionToBeReassigned._1._2
-        val newReplicas = partitionToBeReassigned._2
-        val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-        try {
-          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get((topic, partition))
-          assignedReplicasOpt match {
-            case Some(assignedReplicas) =>
-              if(assignedReplicas == newReplicas) {
-                throw new KafkaException("Partition [%s, %d] to be reassigned is already assigned to replicas"
-                  .format(topic, partition) +
-                  " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
-              }else {
-                if(aliveNewReplicas == newReplicas) {
-                  info("Handling reassignment of partition [%s, %d] to new replicas %s".format(topic, partition,
-                    newReplicas.mkString(",")))
-                  val context = createReassignmentContextForPartition(topic, partition, newReplicas)
-                  controllerContext.partitionsBeingReassigned.put((topic, partition), context)
-                  controller.onPartitionReassignment(topic, partition, context)
-                }else {
-                  // some replica in RAR is not alive. Fail partition reassignment
-                  throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
-                    " %s for partition [%s, %d] to be reassigned are alive. ".format(newReplicas.mkString(","), topic, partition) +
-                    "Failing partition reassignment")
-                }
-              }
-            case None => throw new KafkaException("Attempt to reassign partition [%s, %d] that doesn't exist"
-              .format(topic, partition))
-          }
-        }catch {
-          case e => error("Error completing reassignment of partition [%s, %d]".format(topic, partition), e)
-          // remove the partition from the admin path to unblock the admin client
-          controller.removePartitionFromReassignedPartitions(topic, partition)
-        }
-      }
-    }
-  }
-
-  /**
-   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
-   * @throws Exception
-   *             On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
-
-  private def createReassignmentContextForPartition(topic: String,
-                                                    partition: Int,
-                                                    newReplicas: Seq[Int]): ReassignedPartitionsContext = {
-    val context = new ReassignedPartitionsContext(newReplicas)
-    // first register ISR change listener
-    watchIsrChangesForReassignedPartition(topic, partition, context)
-    context
-  }
-
-  private def watchIsrChangesForReassignedPartition(topic: String, partition: Int,
-                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition,
-      reassignedReplicas.toSet)
-    reassignedPartitionContext.isrChangeListener = isrChangeListener
-    // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
-  }
-}
-
-class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
-                                            reassignedReplicas: Set[Int])
-  extends IZkDataListener with Logging {
-  this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
-  val zkClient = controller.controllerContext.zkClient
-  val controllerContext = controller.controllerContext
-
-  /**
-   * Invoked when some partitions are reassigned by the admin command
-   * @throws Exception On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataChange(dataPath: String, data: Object) {
-    try {
-      controllerContext.controllerLock synchronized {
-        debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
-        // check if this partition is still being reassigned or not
-        controllerContext.partitionsBeingReassigned.get((topic, partition)) match {
-          case Some(reassignedPartitionContext) =>
-            // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-            val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-            newLeaderAndIsrOpt match {
-              case Some(leaderAndIsr) => // check if new replicas have joined ISR
-                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-                if(caughtUpReplicas == reassignedReplicas) {
-                  // resume the partition reassignment process
-                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
-                    "Resuming partition reassignment")
-                  controller.onPartitionReassignment(topic, partition, reassignedPartitionContext)
-                }else {
-                  info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
-                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
-                }
-              case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created"
-                .format(topic, partition, reassignedReplicas.mkString(",")))
-            }
-          case None =>
-        }
-      }
-    }catch {
-      case e => error("Error while handling partition reassignment", e)
-    }
-  }
-
-  /**
-   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
-   * @throws Exception
-   *             On any error.
-   */
-  @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String) {
-  }
-}
-
-case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
-
-case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
-
 object ControllerStat extends KafkaMetricsGroup {
   val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala Wed Oct 10 18:42:57 2012
@@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import collection.JavaConversions._
-import kafka.common.{StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{StateChangeFailedException, PartitionOfflineException, KafkaException}
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
@@ -43,7 +43,6 @@ class PartitionStateMachine(controller: 
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
-  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private var isShuttingDown = new AtomicBoolean(false)
 
   /**
@@ -83,8 +82,7 @@ class PartitionStateMachine(controller: 
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
       partitionState.filter(partitionAndState =>
         partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
-        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition,
-                                               offlinePartitionSelector)
+        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition)
       }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
@@ -97,13 +95,12 @@ class PartitionStateMachine(controller: 
    * @param partitions   The list of partitions that need to be transitioned to the target state
    * @param targetState  The state that the partitions should be moved to
    */
-  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState,
-                         leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) {
+  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState) {
     info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
       partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState, leaderSelector)
+        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState)
       }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
@@ -118,8 +115,7 @@ class PartitionStateMachine(controller: 
    * @param partition   The partition for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
-                                leaderSelector: PartitionLeaderSelector) {
+  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState) {
     try {
       partitionState.getOrElseUpdate((topic, partition), NonExistentPartition)
       targetState match {
@@ -132,18 +128,17 @@ class PartitionStateMachine(controller: 
           info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
             "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(",")))
         case OnlinePartition =>
-          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
+          // pre: partition should be in New state
+          assertValidPreviousStates(topic, partition, List(NewPartition, OfflinePartition), OnlinePartition)
           partitionState(topic, partition) match {
             case NewPartition =>
               // initialize leader and isr path for new partition
-              initializeLeaderAndIsrForPartition(topic, partition)
+              initializeLeaderAndIsrForPartition(topic, partition, brokerRequestBatch)
             case OfflinePartition =>
-              electLeaderForPartition(topic, partition, leaderSelector)
-            case OnlinePartition => // invoked when the leader needs to be re-elected
-              electLeaderForPartition(topic, partition, leaderSelector)
+              electLeaderForOfflinePartition(topic, partition, brokerRequestBatch)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
+          info("Partition [%s, %d] state changed from %s to Online with leader %d".format(topic, partition,
             partitionState(topic, partition), controllerContext.allLeaders(topic, partition)))
           partitionState.put((topic, partition), OnlinePartition)
            // post: partition has a leader
@@ -220,7 +215,8 @@ class PartitionStateMachine(controller: 
    * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
    *                      this state change
    */
-  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int) {
+  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int,
+                                                 brokerRequestBatch: ControllerBrokerRequestBatch) {
     debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
@@ -238,10 +234,10 @@ class PartitionStateMachine(controller: 
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
-          // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
+          // TODO: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
+          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
           controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
           partitionState.put((topic, partition), OnlinePartition)
         }catch {
@@ -261,38 +257,101 @@ class PartitionStateMachine(controller: 
    * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
    *                      this state change
    */
-  def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
+  private def electLeaderForOfflinePartition(topic: String, partition: Int,
+                                             brokerRequestBatch: ControllerBrokerRequestBatch) {
     /** handle leader election for the partitions whose leader is no longer alive **/
-    info("Electing leader for partition [%s, %d]".format(topic, partition))
+    info("Electing leader for Offline partition [%s, %d]".format(topic, partition))
     try {
-      var zookeeperPathUpdateSucceeded: Boolean = false
-      var newLeaderAndIsr: LeaderAndIsr = null
-      var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
-      while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
-        // elect new leader or throw exception
-        val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
-        val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
-        newLeaderAndIsr = leaderAndIsr
-        newLeaderAndIsr.zkVersion = newVersion
-        zookeeperPathUpdateSucceeded = updateSucceeded
-        replicasForThisPartition = replicas
+      controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+        case Some(assignedReplicas) =>
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          try {
+            // elect new leader or throw exception
+            val newLeaderAndIsr = electLeaderForPartition(topic, partition, assignedReplicas)
+            info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+            // store new leader and isr info in cache
+            brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition, topic, partition,
+              newLeaderAndIsr)
+          }catch {
+            case e => throw new StateChangeFailedException(("Error while electing leader for partition" +
+              " [%s, %d]").format(topic, partition), e)
+          }
+        case None => throw new KafkaException(("While handling broker changes, the " +
+          "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
+          .format(topic, partition, controllerContext.partitionReplicaAssignment))
       }
-      // update the leader cache
-      controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
-      info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
-      // store new leader and isr info in cache
-      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr)
     }catch {
-      case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
-        .format(topic, partition) + " Marking this partition offline", poe)
-      case sce => throw new StateChangeFailedException(("Error while electing leader for partition" +
-        " [%s, %d]").format(topic, partition), sce)
+      case e => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
+        .format(topic, partition) + " Marking this partition offline")
     }
     debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
 
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * This API selects a new leader for the input partition -
+   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+   */
+  private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = {
+    var zookeeperPathUpdateSucceeded: Boolean = false
+    var newLeaderAndIsr: LeaderAndIsr = null
+    while(!zookeeperPathUpdateSucceeded) {
+      newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+        case Some(currentLeaderAndIsr) =>
+          var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+          val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+          val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+          debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+            .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+            currentLeaderIsrZkPathVersion))
+          newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+            case true =>
+              debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+                .format(liveAssignedReplicasToThisPartition.mkString(",")))
+              liveAssignedReplicasToThisPartition.isEmpty match {
+                case true =>
+                  ControllerStat.offlinePartitionRate.mark()
+                  throw new PartitionOfflineException(("No replica for partition " +
+                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+                  " Assigned replicas are: [%s]".format(assignedReplicas))
+                case false =>
+                  ControllerStat.uncleanLeaderElectionRate.mark()
+                  val newLeader = liveAssignedReplicasToThisPartition.head
+                  warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+                    "There's potential data loss")
+                  new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+              }
+            case false =>
+              val newLeader = liveBrokersInIsr.head
+              debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+          }
+          info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+          // update the new leadership decision in zookeeper or retry
+          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+          newLeaderAndIsr.zkVersion = newVersion
+          zookeeperPathUpdateSucceeded = updateSucceeded
+          newLeaderAndIsr
+        case None =>
+          throw new StateChangeFailedException("On broker changes, " +
+            "there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topic, partition))
+      }
+    }
+    // update the leader cache
+    controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+    newLeaderAndIsr
+  }
+
   private def registerTopicChangeListener() = {
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
   }
@@ -301,15 +360,6 @@ class PartitionStateMachine(controller: 
     zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
   }
 
-  private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
-    ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-      case Some(currentLeaderAndIsr) => currentLeaderAndIsr
-      case None =>
-        throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
-          "[%s, %d] in %s state".format(topic, partition, partitionState((topic, partition))))
-    }
-  }
-
   /**
    * This is the zookeeper listener that triggers all the state transitions for a partition
    */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Wed Oct 10 18:42:57 2012
@@ -27,15 +27,9 @@ import org.I0Itec.zkclient.IZkChildListe
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
  * transitions to move the replica to another legal state. The different states that a replica can be in are -
- * 1. NewReplica        : The controller can create new replicas during partition reassignment. In this state, a
- *                        replica can only get become follower state change request.  Valid previous
- *                        state is NonExistentReplica
- * 2. OnlineReplica     : Once a replica is started and part of the assigned replicas for its partition, it is in this
- *                        state. In this state, it can get either become leader or become follower state change requests.
- *                        Valid previous state are NewReplica, OnlineReplica or OfflineReplica
- * 3. OfflineReplica    : If a replica dies, it moves to this state. This happens when the broker hosting the replica
- *                        is down. Valid previous state are NewReplica, OnlineReplica
- * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
+ * 1. OnlineReplica     : Once a replica is started, it is in this state. Valid previous state are OnlineReplica or
+ *                        OfflineReplica
+ * 2. OfflineReplica    : If a replica dies, it moves to this state. Valid previous state is OnlineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
   this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
@@ -55,8 +49,7 @@ class ReplicaStateMachine(controller: Ka
     // initialize replica state
     initializeReplicaState()
     // move all Online replicas to Online
-    handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
-      controllerContext.liveBrokerIds.toSeq), OnlineReplica)
+    handleStateChanges(controllerContext.liveBrokerIds.toSeq, OnlineReplica)
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
 
@@ -79,11 +72,20 @@ class ReplicaStateMachine(controller: Ka
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
-  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState) {
-    info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
+  def handleStateChanges(brokerIds: Seq[Int], targetState: ReplicaState) {
+    info("Invoking state change to %s for brokers %s".format(targetState, brokerIds.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
-      replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
+      brokerIds.foreach { brokerId =>
+        // read all the partitions and their assigned replicas into a map organized by
+        // { replica id -> partition 1, partition 2...
+        val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(controllerContext.allTopics.toSeq, brokerId)
+        partitionsAssignedToThisBroker.foreach { topicAndPartition =>
+          handleStateChange(topicAndPartition._1, topicAndPartition._2, brokerId, targetState)
+        }
+        if(partitionsAssignedToThisBroker.size == 0)
+          info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState), e)
@@ -98,62 +100,28 @@ class ReplicaStateMachine(controller: Ka
    * @param replicaId   The replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+  private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
     try {
-      replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
       targetState match {
-        case NewReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
-          // start replica as a follower to the current leader for its partition
+        case OnlineReplica =>
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState)
+          // check if the leader for this partition is alive or even exists
+          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
+          // for the ISR anyways
           val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
           leaderAndIsrOpt match {
             case Some(leaderAndIsr) =>
-              if(leaderAndIsr.leader == replicaId)
-                throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
-                  .format(replicaId, topic, partition) + "state as it is being requested to become leader")
-              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
-            case None => // new leader request will be sent to this replica when one gets elected
-          }
-          replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
-        case NonExistentReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
-          // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition)
-          // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
-          controllerContext.partitionReplicaAssignment.put((topic, partition),
-            currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
-          replicaState.remove((topic, partition, replicaId))
-        case OnlineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
-          replicaState((topic, partition, replicaId)) match {
-            case NewReplica =>
-              // add this replica to the assigned replicas list for its partition
-              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
-              controllerContext.partitionReplicaAssignment.put((topic, partition), currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
-            case _ =>
-              // check if the leader for this partition is alive or even exists
-              // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
-              // for the ISR anyways
-              val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
-              leaderAndIsrOpt match {
-                case Some(leaderAndIsr) =>
-                  controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
-                    case true => // leader is alive
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
-                      replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
-                    case false => // ignore partitions whose leader is not alive
-                  }
-                case None => // ignore partitions who don't have a leader yet
+              controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
+                case true => // leader is alive
+                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
+                  replicaState.put((topic, partition, replicaId), OnlineReplica)
+                  info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+                case false => // ignore partitions whose leader is not alive
               }
+            case None => // ignore partitions who don't have a leader yet
           }
-          replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+          assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
           var zookeeperPathUpdateSucceeded: Boolean = false
           var newLeaderAndIsr: LeaderAndIsr = null
@@ -176,7 +144,7 @@ class ReplicaStateMachine(controller: Ka
             }
           }
           // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
+          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
           // update the local leader and isr cache
           controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
           replicaState.put((topic, partition, replicaId), OfflineReplica)
@@ -258,9 +226,7 @@ class ReplicaStateMachine(controller: Ka
 }
 
 sealed trait ReplicaState { def state: Byte }
-case object NewReplica extends ReplicaState { val state: Byte = 1 }
-case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
-case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
-case object NonExistentReplica extends ReplicaState { val state: Byte = 4 }
+case object OnlineReplica extends ReplicaState { val state: Byte = 1 }
+case object OfflineReplica extends ReplicaState { val state: Byte = 2 }
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Wed Oct 10 18:42:57 2012
@@ -31,45 +31,35 @@ import kafka.metrics.{KafkaTimer, KafkaM
 /**
  * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
  * will fail on an immutable message set. An optional limit and start position can be applied to the message set
- * which will control the position in the file at which the set begins
+ * which will control the position in the file at which the set begins.
  */
 @nonthreadsafe
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
-                                    private[log] val start: Long, // the starting position in the file
-                                    private[log] val limit: Long, // the length (may be less than the file length)
-                                    val mutable: Boolean) extends MessageSet with Logging {
+                                    private[log] val start: Long = 0L,
+                                    private[log] val limit: Long = Long.MaxValue) extends MessageSet with Logging {
   
-  private val setSize = new AtomicLong()
-
-  if(mutable) {
-    if(limit < Long.MaxValue || start > 0)
-      throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
-
-    setSize.set(channel.size())
-    channel.position(channel.size)
-  } else {
-    setSize.set(scala.math.min(channel.size(), limit) - start)
-  }
+  /* the size of the message set in bytes */
+  private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
+    
+  /* set the file position to the last byte in the file */
+  channel.position(channel.size)
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, channel: FileChannel, mutable: Boolean) = 
-    this(file, channel, 0, Long.MaxValue, mutable)
+  def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue)
   
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, mutable: Boolean) = 
-    this(file, Utils.openChannel(file, mutable), mutable)
+  def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
   
   /**
    * Return a message set which is a view into this set starting from the given position and with the given size limit.
    */
   def read(position: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()),
-      false)
+    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()))
   }
   
   /**
@@ -79,7 +69,7 @@ class FileMessageSet private[kafka](val 
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(12)
-    val size = setSize.get()
+    val size = _size.get()
     while(position + 12 < size) {
       buffer.rewind()
       channel.read(buffer, position)
@@ -138,29 +128,22 @@ class FileMessageSet private[kafka](val 
   /**
    * The number of bytes taken up by this file set
    */
-  def sizeInBytes(): Long = setSize.get()
-
-  def checkMutable(): Unit = {
-    if(!mutable)
-      throw new KafkaException("Attempt to invoke mutation on immutable message set.")
-  }
+  def sizeInBytes(): Long = _size.get()
   
   /**
    * Append this message to the message set
    */
   def append(messages: MessageSet): Unit = {
-    checkMutable()
     var written = 0L
     while(written < messages.sizeInBytes)
       written += messages.writeTo(channel, 0, messages.sizeInBytes)
-    setSize.getAndAdd(written)
+    _size.getAndAdd(written)
   }
  
   /**
    * Commit all written data to the physical disk
    */
   def flush() = {
-    checkMutable()
     LogFlushStats.logFlushTimer.time {
       channel.force(true)
     }
@@ -170,8 +153,7 @@ class FileMessageSet private[kafka](val 
    * Close this message set
    */
   def close() {
-    if(mutable)
-      flush()
+    flush()
     channel.close()
   }
   
@@ -188,13 +170,12 @@ class FileMessageSet private[kafka](val 
    * given size falls on a valid byte offset.
    */
   def truncateTo(targetSize: Long) = {
-    checkMutable()
     if(targetSize > sizeInBytes())
       throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
         " size of this log segment is only %d bytes".format(sizeInBytes()))
     channel.truncate(targetSize)
     channel.position(targetSize)
-    setSize.set(targetSize)
+    _size.set(targetSize)
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Oct 10 18:42:57 2012
@@ -101,8 +101,8 @@ object Log {
 private[kafka] class Log(val dir: File, 
                          val maxLogFileSize: Long, 
                          val maxMessageSize: Int, 
-                         val flushInterval: Int,
-                         val rollIntervalMs: Long, 
+                         val flushInterval: Int = Int.MaxValue,
+                         val rollIntervalMs: Long = Long.MaxValue, 
                          val needsRecovery: Boolean, 
                          val maxIndexSize: Int = (10*1024*1024),
                          val indexIntervalBytes: Int = 4096,
@@ -128,10 +128,10 @@ private[kafka] class Log(val dir: File, 
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 
   newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value() = numberOfSegments })
+           new Gauge[Int] { def getValue = numberOfSegments })
 
   newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value() = logEndOffset })
+           new Gauge[Long] { def getValue = logEndOffset })
 
   /* The name of this log */
   def name  = dir.getName()
@@ -151,8 +151,7 @@ private[kafka] class Log(val dir: File, 
         if(!Log.indexFilename(dir, start).exists)
           throw new IllegalStateException("Found log file with no corresponding index file.")
         logSegments.add(new LogSegment(dir = dir, 
-                                       startOffset = start, 
-                                       mutable = false, 
+                                       startOffset = start,
                                        indexIntervalBytes = indexIntervalBytes, 
                                        maxIndexSize = maxIndexSize))
       }
@@ -161,8 +160,7 @@ private[kafka] class Log(val dir: File, 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
       logSegments.add(new LogSegment(dir = dir, 
-                                     startOffset = 0, 
-                                     mutable = true, 
+                                     startOffset = 0,
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
     } else {
@@ -176,17 +174,9 @@ private[kafka] class Log(val dir: File, 
         }
       })
 
-      //make the final section mutable and run recovery on it if necessary
-      val last = logSegments.remove(logSegments.size - 1)
-      last.close()
-      val mutableSegment = new LogSegment(dir = dir, 
-                                          startOffset = last.start, 
-                                          mutable = true, 
-                                          indexIntervalBytes = indexIntervalBytes, 
-                                          maxIndexSize = maxIndexSize)
+      // run recovery on the last segment if necessary
       if(needsRecovery)
-        recoverSegment(mutableSegment)
-      logSegments.add(mutableSegment)
+        recoverSegment(logSegments.get(logSegments.size - 1))
     }
     new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
@@ -406,12 +396,11 @@ private[kafka] class Log(val dir: File, 
     }
     debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
     segments.view.lastOption match {
-      case Some(segment) => segment.index.makeReadOnly()
+      case Some(segment) => segment.index.trimToSize()
       case None => 
     }
     val segment = new LogSegment(dir, 
                                  startOffset = newOffset,
-                                 mutable = true, 
                                  indexIntervalBytes = indexIntervalBytes, 
                                  maxIndexSize = maxIndexSize)
     segments.append(segment)
@@ -546,8 +535,7 @@ private[kafka] class Log(val dir: File, 
       val deletedSegments = segments.trunc(segments.view.size)
       debug("Truncate and start log '" + name + "' to " + newOffset)
       segments.append(new LogSegment(dir, 
-                                     newOffset, 
-                                     mutable = true, 
+                                     newOffset,
                                      indexIntervalBytes = indexIntervalBytes, 
                                      maxIndexSize = maxIndexSize))
       deleteSegments(deletedSegments)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Wed Oct 10 18:42:57 2012
@@ -28,9 +28,9 @@ class LogSegment(val messageSet: FileMes
   
   @volatile var deleted = false
   
-  def this(dir: File, startOffset: Long, mutable: Boolean, indexIntervalBytes: Int, maxIndexSize: Int) = 
-    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), mutable = mutable), 
-         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, mutable = mutable, maxIndexSize = maxIndexSize),
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = 
+    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
+         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          SystemTime)



Mime
View raw message