kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8237; Untangle TopicDeleteManager and add test cases (#6588)
Date Thu, 25 Apr 2019 05:23:37 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17c8016  KAFKA-8237; Untangle TopicDeleteManager and add test cases (#6588)
17c8016 is described below

commit 17c80166461c0005b7603414db4d0a3541df0f82
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Apr 24 22:23:15 2019 -0700

    KAFKA-8237; Untangle TopicDeleteManager and add test cases (#6588)
    
    The controller maintains state across `ControllerContext`, `PartitionStateMachine`, `ReplicaStateMachine`, and `TopicDeletionManager`. None of this state is actually isolated from the rest. For example, topics undergoing deletion are intertwined with the partition and replica states. As a consequence of this, each of these components tends to be dependent on all the rest, which makes testing and reasoning about the system difficult. This is a first step toward untangling all the state [...]
    
    Additionally, this patch adds several mock objects to enable easier testing: `MockReplicaStateMachine` and `MockPartitionStateMachine`. These have simplified logic for updating the current state. This is used to create some new test cases for `TopicDeletionManager`.
    
    Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
---
 .../controller/ControllerChannelManager.scala      |  21 +-
 .../scala/kafka/controller/ControllerContext.scala | 207 ++++++++++++++----
 .../src/main/scala/kafka/controller/Election.scala | 152 +++++++++++++
 .../scala/kafka/controller/KafkaController.scala   |  78 ++++---
 .../kafka/controller/PartitionStateMachine.scala   | 242 ++++++++-------------
 .../kafka/controller/ReplicaStateMachine.scala     | 186 ++++++++--------
 .../kafka/controller/TopicDeletionManager.scala    | 193 ++++++++--------
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   4 +-
 .../kafka/controller/ControllerFailoverTest.scala  |   2 +-
 .../controller/MockPartitionStateMachine.scala     | 110 ++++++++++
 .../kafka/controller/MockReplicaStateMachine.scala |  36 +++
 .../controller/PartitionStateMachineTest.scala     | 118 ++++------
 .../kafka/controller/ReplicaStateMachineTest.scala |  61 +++---
 .../controller/TopicDeletionManagerTest.scala      | 232 ++++++++++++++++++++
 .../unit/kafka/server/LogDirFailureTest.scala      |   2 +-
 15 files changed, 1093 insertions(+), 551 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3776b69..ca6c00a 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -61,7 +61,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     }
   )
 
-  controllerContext.liveBrokers.foreach(addNewBroker)
+  controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker)
 
   def startup() = {
     brokerLock synchronized {
@@ -351,13 +351,23 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
   }
 
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, deletePartition: Boolean,
-                                      callback: (AbstractResponse, Int) => Unit) {
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
+                                      topicPartition: TopicPartition,
+                                      deletePartition: Boolean): Unit = {
     brokerIds.filter(_ >= 0).foreach { brokerId =>
+      def topicDeletionCallback(stopReplicaResponse: AbstractResponse): Unit = {
+        controller.eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponse, brokerId))
+      }
+
+      val responseReceivedCallback = if (deletePartition && controllerContext.isTopicDeletionInProgress(topicPartition.topic))
+        topicDeletionCallback _
+      else
+        null
+
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
       stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topicPartition, brokerId),
-        deletePartition, (r: AbstractResponse) => callback(r, brokerId))
+        deletePartition, responseReceivedCallback)
     }
   }
 
@@ -394,7 +404,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
 
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
     partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
-      beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
+      beingDeleted = controllerContext.topicsToBeDeleted.contains(partition.topic)))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
@@ -525,4 +535,3 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
 
 case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
 
-class Callbacks(val stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = (_, _ ) => ())
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index c3bcc52..3069024 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -24,47 +24,75 @@ import scala.collection.{Seq, Set, mutable}
 
 class ControllerContext {
   val stats = new ControllerStats
-
-  var controllerChannelManager: ControllerChannelManager = null
-
+  var offlinePartitionCount = 0
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
+  private var liveBrokers: Set[Broker] = Set.empty
+  private var liveBrokerEpochs: Map[Int, Long] = Map.empty
   var epoch: Int = KafkaController.InitialControllerEpoch
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
+
   var allTopics: Set[String] = Set.empty
-  private val partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
-  val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
-  val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
+  val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, Seq[Int]]]
+  val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
+  val partitionsBeingReassigned = mutable.Map.empty[TopicPartition, ReassignedPartitionsContext]
+  val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
+  val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
   val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
 
-  private var liveBrokersUnderlying: Set[Broker] = Set.empty
-  private var liveBrokerIdAndEpochsUnderlying: Map[Int, Long] = Map.empty
+  val topicsToBeDeleted = mutable.Set.empty[String]
+
+  /** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric.
+   * When a topic is going through deletion, we don't want to keep track of its partition state
+   * changes in the offlinePartitionCount metric. This goal means if some partitions of a topic are already
+   * in OfflinePartition state when deletion starts, we need to change the corresponding partition
+   * states to NonExistentPartition first before starting the deletion.
+   *
+   * However we can NOT change partition states to NonExistentPartition at the time of enqueuing topics
+   * for deletion. The reason is that when a topic is enqueued for deletion, it may be ineligible for
+   * deletion due to ongoing partition reassignments. Hence there might be a delay between enqueuing
+   * a topic for deletion and the actual start of deletion. In this delayed interval, partitions may still
+   * transition to or out of the OfflinePartition state.
+   *
+   * Hence we decide to change partition states to NonExistentPartition only when the actual deletion have started.
+   * For topics whose deletion have actually started, we keep track of them in the following topicsWithDeletionStarted
+   * variable. And once a topic is in the topicsWithDeletionStarted set, we are sure there will no longer
+   * be partition reassignments to any of its partitions, and only then it's safe to move its partitions to
+   * NonExistentPartition state. Once a topic is in the topicsWithDeletionStarted set, we will stop monitoring
+   * its partition state changes in the offlinePartitionCount metric
+   */
+  val topicsWithDeletionStarted = mutable.Set.empty[String]
+  val topicsIneligibleForDeletion = mutable.Set.empty[String]
+
 
   def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
-    partitionReplicaAssignmentUnderlying.getOrElse(topicPartition.topic, mutable.Map.empty)
+    partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
       .getOrElse(topicPartition.partition, Seq.empty)
   }
 
   private def clearTopicsState(): Unit = {
     allTopics = Set.empty
-    partitionReplicaAssignmentUnderlying.clear()
+    partitionAssignments.clear()
     partitionLeadershipInfo.clear()
     partitionsBeingReassigned.clear()
     replicasOnOfflineDirs.clear()
+    partitionStates.clear()
+    offlinePartitionCount = 0
+    replicaStates.clear()
   }
 
   def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
-    partitionReplicaAssignmentUnderlying.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
+    partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
       .put(topicPartition.partition, newReplicas)
   }
 
   def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = {
-    partitionReplicaAssignmentUnderlying.getOrElse(topic, Map.empty).map {
+    partitionAssignments.getOrElse(topic, Map.empty).map {
       case (partition, replicas) => (new TopicPartition(topic, partition), replicas)
     }.toMap
   }
 
   def allPartitions: Set[TopicPartition] = {
-    partitionReplicaAssignmentUnderlying.flatMap {
+    partitionAssignments.flatMap {
       case (topic, topicReplicaAssignment) => topicReplicaAssignment.map {
         case (partition, _) => new TopicPartition(topic, partition)
       }
@@ -72,37 +100,36 @@ class ControllerContext {
   }
 
   def setLiveBrokerAndEpochs(brokerAndEpochs: Map[Broker, Long]) {
-    liveBrokersUnderlying = brokerAndEpochs.keySet
-    liveBrokerIdAndEpochsUnderlying =
+    liveBrokers = brokerAndEpochs.keySet
+    liveBrokerEpochs =
       brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)}
   }
 
   def addLiveBrokersAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
-    liveBrokersUnderlying = liveBrokersUnderlying ++ brokerAndEpochs.keySet
-    liveBrokerIdAndEpochsUnderlying = liveBrokerIdAndEpochsUnderlying ++
+    liveBrokers = liveBrokers ++ brokerAndEpochs.keySet
+    liveBrokerEpochs = liveBrokerEpochs ++
       (brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)})
   }
 
-  def removeLiveBrokersAndEpochs(brokerIds : Set[Int]): Unit = {
-    liveBrokersUnderlying = liveBrokersUnderlying.filter(broker => !brokerIds.contains(broker.id))
-    liveBrokerIdAndEpochsUnderlying = liveBrokerIdAndEpochsUnderlying.filterKeys(id => !brokerIds.contains(id))
+  def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
+    liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
+    liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id))
   }
 
-  def updateBrokerMetadata(oldMetadata: Option[Broker], newMetadata: Option[Broker]): Unit = {
-    liveBrokersUnderlying = liveBrokersUnderlying -- oldMetadata ++ newMetadata
+  def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
+    liveBrokers -= oldMetadata
+    liveBrokers += newMetadata
   }
 
   // getter
-  def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
-  def liveBrokerIds = liveBrokerIdAndEpochsUnderlying.keySet -- shuttingDownBrokerIds
-
-  def liveOrShuttingDownBrokerIds = liveBrokerIdAndEpochsUnderlying.keySet
-  def liveOrShuttingDownBrokers = liveBrokersUnderlying
-
-  def liveBrokerIdAndEpochs = liveBrokerIdAndEpochsUnderlying
+  def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet -- shuttingDownBrokerIds
+  def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet
+  def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers
+  def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs
+  def liveOrShuttingDownBroker(brokerId: Int): Option[Broker] = liveOrShuttingDownBrokers.find(_.id == brokerId)
 
   def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
-    partitionReplicaAssignmentUnderlying.flatMap {
+    partitionAssignments.flatMap {
       case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
         case (_, replicas) => replicas.contains(brokerId)
       }.map {
@@ -121,7 +148,7 @@ class ControllerContext {
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
-      partitionReplicaAssignmentUnderlying.flatMap {
+      partitionAssignments.flatMap {
         case (topic, topicReplicaAssignment) => topicReplicaAssignment.collect {
           case (partition, replicas)  if replicas.contains(brokerId) =>
             PartitionAndReplica(new TopicPartition(topic, partition), brokerId)
@@ -131,13 +158,13 @@ class ControllerContext {
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
-    partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).flatMap {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
       case (partition, replicas) => replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
     }.toSet
   }
 
   def partitionsForTopic(topic: String): collection.Set[TopicPartition] = {
-    partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).map {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).map {
       case (partition, _) => new TopicPartition(topic, partition)
     }.toSet
   }
@@ -156,10 +183,9 @@ class ControllerContext {
   }
 
   def resetContext(): Unit = {
-    if (controllerChannelManager != null) {
-      controllerChannelManager.shutdown()
-      controllerChannelManager = null
-    }
+    topicsToBeDeleted.clear()
+    topicsWithDeletionStarted.clear()
+    topicsIneligibleForDeletion.clear()
     shuttingDownBrokerIds.clear()
     epoch = 0
     epochZkVersion = 0
@@ -169,10 +195,115 @@ class ControllerContext {
 
   def removeTopic(topic: String): Unit = {
     allTopics -= topic
-    partitionReplicaAssignmentUnderlying.remove(topic)
+    partitionAssignments.remove(topic)
     partitionLeadershipInfo.foreach {
       case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
       case _ =>
     }
   }
+
+  def beginTopicDeletion(topics: Set[String]): Unit = {
+    topicsWithDeletionStarted ++= topics
+  }
+
+  def isTopicDeletionInProgress(topic: String): Boolean = {
+    topicsWithDeletionStarted.contains(topic)
+  }
+
+  def isTopicQueuedUpForDeletion(topic: String): Boolean = {
+    topicsToBeDeleted.contains(topic)
+  }
+
+  def isTopicEligibleForDeletion(topic: String): Boolean = {
+    topicsToBeDeleted.contains(topic) && !topicsIneligibleForDeletion.contains(topic)
+  }
+
+  def topicsQueuedForDeletion: Set[String] = {
+    topicsToBeDeleted
+  }
+
+  def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
+    replicasForTopic(topic).filter(replica => replicaStates(replica) == state).toSet
+  }
+
+  def areAllReplicasInState(topic: String, state: ReplicaState): Boolean = {
+    replicasForTopic(topic).forall(replica => replicaStates(replica) == state)
+  }
+
+  def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
+    replicasForTopic(topic).exists(replica => replicaStates(replica) == state)
+  }
+
+  def checkValidReplicaStateChange(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): (Seq[PartitionAndReplica], Seq[PartitionAndReplica]) = {
+    replicas.partition(replica => isValidReplicaStateTransition(replica, targetState))
+  }
+
+  def checkValidPartitionStateChange(partitions: Seq[TopicPartition], targetState: PartitionState): (Seq[TopicPartition], Seq[TopicPartition]) = {
+    partitions.partition(p => isValidPartitionStateTransition(p, targetState))
+  }
+
+  def putReplicaState(replica: PartitionAndReplica, state: ReplicaState): Unit = {
+    replicaStates.put(replica, state)
+  }
+
+  def removeReplicaState(replica: PartitionAndReplica): Unit = {
+    replicaStates.remove(replica)
+  }
+
+  def putReplicaStateIfNotExists(replica: PartitionAndReplica, state: ReplicaState): Unit = {
+    replicaStates.getOrElseUpdate(replica, state)
+  }
+
+  def putPartitionState(partition: TopicPartition, targetState: PartitionState): Unit = {
+    val currentState = partitionStates.put(partition, targetState).getOrElse(NonExistentPartition)
+    updatePartitionStateMetrics(partition, currentState, targetState)
+  }
+
+  private def updatePartitionStateMetrics(partition: TopicPartition,
+                                          currentState: PartitionState,
+                                          targetState: PartitionState): Unit = {
+    if (!isTopicDeletionInProgress(partition.topic)) {
+      if (currentState != OfflinePartition && targetState == OfflinePartition) {
+        offlinePartitionCount = offlinePartitionCount + 1
+      } else if (currentState == OfflinePartition && targetState != OfflinePartition) {
+        offlinePartitionCount = offlinePartitionCount - 1
+      }
+    }
+  }
+
+  def putPartitionStateIfNotExists(partition: TopicPartition, state: PartitionState): Unit = {
+    if (partitionStates.getOrElseUpdate(partition, state) == state)
+      updatePartitionStateMetrics(partition, NonExistentPartition, state)
+  }
+
+  def replicaState(replica: PartitionAndReplica): ReplicaState = {
+    replicaStates(replica)
+  }
+
+  def partitionState(partition: TopicPartition): PartitionState = {
+    partitionStates(partition)
+  }
+
+  def partitionsInState(state: PartitionState): Set[TopicPartition] = {
+    partitionStates.filter { case (_, s) => s == state }.keySet.toSet
+  }
+
+  def partitionsInStates(states: Set[PartitionState]): Set[TopicPartition] = {
+    partitionStates.filter { case (_, s) => states.contains(s) }.keySet.toSet
+  }
+
+  def partitionsInState(topic: String, state: PartitionState): Set[TopicPartition] = {
+    partitionsForTopic(topic).filter { partition => state == partitionState(partition) }.toSet
+  }
+
+  def partitionsInStates(topic: String, states: Set[PartitionState]): Set[TopicPartition] = {
+    partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
+  }
+
+  private def isValidReplicaStateTransition(replica: PartitionAndReplica, targetState: ReplicaState): Boolean =
+    targetState.validPreviousStates.contains(replicaStates(replica))
+
+  private def isValidPartitionStateTransition(partition: TopicPartition, targetState: PartitionState): Boolean =
+    targetState.validPreviousStates.contains(partitionStates(partition))
+
 }
diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
new file mode 100644
index 0000000..9209992
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import org.apache.kafka.common.TopicPartition
+
+case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int])
+
+object Election {
+
+  private def leaderForOffline(partition: TopicPartition,
+                               leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
+                               uncleanLeaderElectionEnabled: Boolean,
+                               controllerContext: ControllerContext): ElectionResult = {
+
+    val assignment = controllerContext.partitionReplicaAssignment(partition)
+    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+    leaderIsrAndControllerEpochOpt match {
+      case Some(leaderIsrAndControllerEpoch) =>
+        val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr,
+          liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
+        val newLeaderAndIsrOpt = leaderOpt.map { leader =>
+          val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+          else List(leader)
+          leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+        }
+        ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)
+
+      case None =>
+        ElectionResult(partition, None, liveReplicas)
+    }
+  }
+
+  /**
+   * Elect leaders for new or offline partitions.
+   *
+   * @param controllerContext Context with the current state of the cluster
+   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples representing the partitions
+   *                                                 that need election, their leader/ISR state, and whether
+   *                                                 or not unclean leader election is enabled
+   *
+   * @return The election results
+   */
+  def leaderForOffline(controllerContext: ControllerContext,
+                       partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]): Seq[ElectionResult] = {
+    partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
+      leaderForOffline(partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled, controllerContext)
+    }
+  }
+
+  private def leaderForReassign(partition: TopicPartition,
+                                leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                controllerContext: ControllerContext): ElectionResult = {
+    val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
+    val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+    ElectionResult(partition, newLeaderAndIsrOpt, reassignment)
+  }
+
+  /**
+   * Elect leaders for partitions that are undergoing reassignment.
+   *
+   * @param controllerContext Context with the current state of the cluster
+   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   *                                     and their respective leader/ISR states
+   *
+   * @return The election results
+   */
+  def leaderForReassign(controllerContext: ControllerContext,
+                        leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      leaderForReassign(partition, leaderIsrAndControllerEpoch, controllerContext)
+    }
+  }
+
+  private def leaderForPreferredReplica(partition: TopicPartition,
+                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                        controllerContext: ControllerContext): ElectionResult = {
+    val assignment = controllerContext.partitionReplicaAssignment(partition)
+    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+    ElectionResult(partition, newLeaderAndIsrOpt, assignment)
+  }
+
+  /**
+   * Elect preferred leaders.
+   *
+   * @param controllerContext Context with the current state of the cluster
+   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   *                                     and their respective leader/ISR states
+   *
+   * @return The election results
+   */
+  def leaderForPreferredReplica(controllerContext: ControllerContext,
+                                leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      leaderForPreferredReplica(partition, leaderIsrAndControllerEpoch, controllerContext)
+    }
+  }
+
+  private def leaderForControlledShutdown(partition: TopicPartition,
+                                          leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                          shuttingDownBrokerIds: Set[Int],
+                                          controllerContext: ControllerContext): ElectionResult = {
+    val assignment = controllerContext.partitionReplicaAssignment(partition)
+    val liveOrShuttingDownReplicas = assignment.filter(replica =>
+      controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true))
+    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr,
+      liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds)
+    val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica))
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
+    ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
+  }
+
+  /**
+   * Elect leaders for partitions whose current leaders are shutting down.
+   *
+   * @param controllerContext Context with the current state of the cluster
+   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   *                                     and their respective leader/ISR states
+   *
+   * @return The election results
+   */
+  def leaderForControlledShutdown(controllerContext: ControllerContext,
+                                  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
+    val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      leaderForControlledShutdown(partition, leaderIsrAndControllerEpoch, shuttingDownBrokerIds, controllerContext)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ea23beb..183ffaf 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractRespons
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
+import scala.collection.JavaConverters._
 
 import scala.collection._
 import scala.util.{Failure, Try}
@@ -72,6 +73,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
   val controllerContext = new ControllerContext
+  var controllerChannelManager: ControllerChannelManager = _
 
   // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
   // visible for testing
@@ -81,11 +83,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   private[controller] val eventManager = new ControllerEventManager(config.brokerId,
     controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
 
-  val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
-  val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
-  val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
-  partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
+  val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
+    new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
+    new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+    partitionStateMachine, new ControllerDeletionClient(this, zkClient))
 
   private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
   private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -304,9 +308,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
     unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
 
-    // reset topic deletion manager
-    topicDeletionManager.reset()
-
     // shutdown leader rebalance scheduler
     kafkaScheduler.shutdown()
     offlinePartitionCount = 0
@@ -329,6 +330,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     replicaStateMachine.shutdown()
     zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
 
+
+    if (controllerChannelManager != null) {
+      controllerChannelManager.shutdown()
+      controllerChannelManager = null
+    }
     controllerContext.resetContext()
 
     info("Resigned")
@@ -390,7 +396,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
     if (replicasForTopicsToBeDeleted.nonEmpty) {
       info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
-        s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
+        s"${controllerContext.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
         s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
       topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
     }
@@ -486,7 +492,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info(s"New partition creation callback for ${newPartitions.mkString(",")}")
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
-    partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
   }
 
@@ -646,7 +652,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
     try {
       val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition,
-        Option(PreferredReplicaPartitionLeaderElectionStrategy))
+        Some(PreferredReplicaPartitionLeaderElectionStrategy))
       if (electionType != AdminClientTriggered) {
         results.foreach { case (tp, throwable) =>
           if (throwable.isInstanceOf[ControllerMovedException]) {
@@ -677,7 +683,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.partitionLeadershipInfo.clear()
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // register broker modifications handlers
-    registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
+    registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
@@ -733,9 +739,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   }
 
   private def startChannelManager() {
-    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
+    controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
       stateChangeLogger, threadNamePrefix)
-    controllerContext.controllerChannelManager.startup()
+    controllerChannelManager.startup()
   }
 
   private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) {
@@ -763,7 +769,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
         s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
       // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+      partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
     } else {
       // check if the leader is alive or not
       if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
@@ -774,7 +780,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       } else {
         info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
           s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
-        partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
+        partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
       }
     }
   }
@@ -912,7 +918,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
                                       callback: AbstractResponse => Unit = null) = {
-    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
+    controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
   }
 
   /**
@@ -1091,12 +1097,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       val (partitionsLedByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
         controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
       }
-      partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+      partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Some(ControlledShutdownPartitionLeaderElectionStrategy))
       try {
         brokerRequestBatch.newBatch()
         partitionsFollowedByBroker.foreach { partition =>
-          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false,
-            (_, _) => ())
+          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false)
         }
         brokerRequestBatch.sendRequestsToBrokers(epoch)
       } catch {
@@ -1124,7 +1129,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     def state = ControllerState.LeaderAndIsrResponseReceived
 
     override def process(): Unit = {
-      import JavaConverters._
       if (!isActive) return
       val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
 
@@ -1156,11 +1160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     def state = ControllerState.TopicDeletion
 
     override def process(): Unit = {
-      import JavaConverters._
       if (!isActive) return
       val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
-      debug(s"Delete topic callback invoked for $stopReplicaResponse")
       val responseMap = stopReplicaResponse.responses.asScala
+      debug(s"Delete topic callback invoked on StopReplica response received from broker $replicaId: $stopReplicaResponse")
       val partitionsInError =
         if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
         else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
@@ -1191,7 +1194,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       if (!isActive) {
         0
       } else {
-        partitionStateMachine.offlinePartitionCount
+        controllerContext.offlinePartitionCount
       }
 
     preferredReplicaImbalanceCount =
@@ -1309,22 +1312,22 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
         s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
 
-      newBrokerAndEpochs.keySet.foreach(controllerContext.controllerChannelManager.addBroker)
-      bouncedBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
-      bouncedBrokerAndEpochs.keySet.foreach(controllerContext.controllerChannelManager.addBroker)
-      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+      newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
+      bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
+      bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
+      deadBrokerIds.foreach(controllerChannelManager.removeBroker)
       if (newBrokerIds.nonEmpty) {
         controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
         onBrokerStartup(newBrokerIdsSorted)
       }
       if (bouncedBrokerIds.nonEmpty) {
-        controllerContext.removeLiveBrokersAndEpochs(bouncedBrokerIds)
+        controllerContext.removeLiveBrokers(bouncedBrokerIds)
         onBrokerFailure(bouncedBrokerIdsSorted)
         controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
         onBrokerStartup(bouncedBrokerIdsSorted)
       }
       if (deadBrokerIds.nonEmpty) {
-        controllerContext.removeLiveBrokersAndEpochs(deadBrokerIds)
+        controllerContext.removeLiveBrokers(deadBrokerIds)
         onBrokerFailure(deadBrokerIdsSorted)
       }
 
@@ -1339,13 +1342,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
     override def process(): Unit = {
       if (!isActive) return
-      val newMetadata = zkClient.getBroker(brokerId)
-      val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId)
-      if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
-        info(s"Updated broker: ${newMetadata.get}")
-
-        controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
-        onBrokerUpdate(brokerId)
+      val newMetadataOpt = zkClient.getBroker(brokerId)
+      val oldMetadataOpt = controllerContext.liveOrShuttingDownBroker(brokerId)
+      if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
+        val oldMetadata = oldMetadataOpt.get
+        val newMetadata = newMetadataOpt.get
+        if (newMetadata.endPoints != oldMetadata.endPoints) {
+          info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
+          controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
+          onBrokerUpdate(brokerId)
+        }
       }
     }
   }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ad73979..637cea8 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -18,10 +18,11 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
+import kafka.controller.Election._
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException
@@ -29,33 +30,7 @@ import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
 
-
-/**
- * This class represents the state machine for partitions. It defines the states that a partition can be in, and
- * transitions to move the partition to another legal state. The different states that a partition can be in are -
- * 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
- *                          deleted. Valid previous state, if one exists, is OfflinePartition
- * 2. NewPartition        : After creation, the partition is in the NewPartition state. In this state, the partition should have
- *                          replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
- * 3. OnlinePartition     : Once a leader is elected for a partition, it is in the OnlinePartition state.
- *                          Valid previous states are NewPartition/OfflinePartition
- * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
- *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
- */
-class PartitionStateMachine(config: KafkaConfig,
-                            stateChangeLogger: StateChangeLogger,
-                            controllerContext: ControllerContext,
-                            zkClient: KafkaZkClient,
-                            partitionState: mutable.Map[TopicPartition, PartitionState],
-                            controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
-  private val controllerId = config.brokerId
-
-  private var topicDeletionManager: TopicDeletionManager = _
-
-  this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
-
-  var offlinePartitionCount = 0
-
+abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
   /**
    * Invoked on successful controller election.
    */
@@ -64,20 +39,40 @@ class PartitionStateMachine(config: KafkaConfig,
     initializePartitionState()
     info("Triggering online partition state changes")
     triggerOnlinePartitionStateChange()
-    info(s"Started partition state machine with initial state -> $partitionState")
+    debug(s"Started partition state machine with initial state -> ${controllerContext.partitionStates}")
   }
 
   /**
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    partitionState.clear()
-    offlinePartitionCount = 0
     info("Stopped partition state machine")
   }
 
-  def setTopicDeletionManager(topicDeletionManager: TopicDeletionManager) {
-    this.topicDeletionManager = topicDeletionManager
+  /**
+   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
+   * state. This is called on a successful controller election and on broker changes
+   */
+  def triggerOnlinePartitionStateChange(): Unit = {
+    val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
+    triggerOnlineStateChangeForPartitions(partitions)
+  }
+
+  def triggerOnlinePartitionStateChange(topic: String): Unit = {
+    val partitions = controllerContext.partitionsInStates(topic, Set(OfflinePartition, NewPartition))
+    triggerOnlineStateChangeForPartitions(partitions)
+  }
+
+  private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {
+    // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
+    // that belong to topics to be deleted
+    val partitionsToTrigger = partitions.filter { partition =>
+      !controllerContext.isTopicQueuedUpForDeletion(partition.topic)
+    }.toSeq
+
+    handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
+    // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error.
+    // It is important to trigger leader election for those partitions.
   }
 
   /**
@@ -92,38 +87,47 @@ class PartitionStateMachine(config: KafkaConfig,
           // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
           if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
           // leader is alive
-            changeStateTo(topicPartition, NonExistentPartition, OnlinePartition)
+            controllerContext.putPartitionState(topicPartition, OnlinePartition)
           else
-            changeStateTo(topicPartition, NonExistentPartition, OfflinePartition)
+            controllerContext.putPartitionState(topicPartition, OfflinePartition)
         case None =>
-          changeStateTo(topicPartition, NonExistentPartition, NewPartition)
+          controllerContext.putPartitionState(topicPartition, NewPartition)
       }
     }
   }
 
-  /**
-   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
-   * state. This is called on a successful controller election and on broker changes
-   */
-  def triggerOnlinePartitionStateChange() {
-    triggerOnlinePartitionStateChange(partitionState.toMap)
+  def handleStateChanges(partitions: Seq[TopicPartition],
+                         targetState: PartitionState): Map[TopicPartition, Throwable] = {
+    handleStateChanges(partitions, targetState, None)
   }
 
-  def triggerOnlinePartitionStateChange(topic: String) {
-    triggerOnlinePartitionStateChange(partitionState.filterKeys(p => p.topic.equals(topic)).toMap)
-  }
+  def handleStateChanges(partitions: Seq[TopicPartition],
+                         targetState: PartitionState,
+                         leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable]
 
-  def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]) {
-    // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
-    // that belong to topics to be deleted
-    val partitionsToTrigger = partitionState.filter { case (partition, partitionState) =>
-      !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic) &&
-        (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
-    }.keys.toSeq
-    handleStateChanges(partitionsToTrigger, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error.
-    // It is important to trigger leader election for those partitions.
-  }
+}
+
+/**
+ * This class represents the state machine for partitions. It defines the states that a partition can be in, and
+ * transitions to move the partition to another legal state. The different states that a partition can be in are -
+ * 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
+ *                          deleted. Valid previous state, if one exists, is OfflinePartition
+ * 2. NewPartition        : After creation, the partition is in the NewPartition state. In this state, the partition should have
+ *                          replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
+ * 3. OnlinePartition     : Once a leader is elected for a partition, it is in the OnlinePartition state.
+ *                          Valid previous states are NewPartition/OfflinePartition
+ * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
+ *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
+ */
+class ZkPartitionStateMachine(config: KafkaConfig,
+                              stateChangeLogger: StateChangeLogger,
+                              controllerContext: ControllerContext,
+                              zkClient: KafkaZkClient,
+                              controllerBrokerRequestBatch: ControllerBrokerRequestBatch)
+  extends PartitionStateMachine(controllerContext) {
+
+  private val controllerId = config.brokerId
+  this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
 
   /**
     * Try to change the state of the given partitions to the given targetState, using the given
@@ -133,8 +137,8 @@ class PartitionStateMachine(config: KafkaConfig,
     * @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required.
     * @return partitions and corresponding throwable for those partitions which could not transition to the given state
     */
-  def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
-                         partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Map[TopicPartition, Throwable] = {
+  override def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
+                         partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
     if (partitions.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
@@ -154,24 +158,8 @@ class PartitionStateMachine(config: KafkaConfig,
     }
   }
 
-
-  def partitionsInState(state: PartitionState): Set[TopicPartition] = {
-    partitionState.filter { case (_, s) => s == state }.keySet.toSet
-  }
-
-  private def changeStateTo(partition: TopicPartition, currentState: PartitionState, targetState: PartitionState): Unit = {
-    partitionState.put(partition, targetState)
-    updateControllerMetrics(partition, currentState, targetState)
-  }
-
-  private def updateControllerMetrics(partition: TopicPartition, currentState: PartitionState, targetState: PartitionState) : Unit = {
-    if (!topicDeletionManager.isTopicWithDeletionStarted(partition.topic)) {
-      if (currentState != OfflinePartition && targetState == OfflinePartition) {
-        offlinePartitionCount = offlinePartitionCount + 1
-      } else if (currentState == OfflinePartition && targetState != OfflinePartition) {
-        offlinePartitionCount = offlinePartitionCount - 1
-      }
-    }
+  private def partitionState(partition: TopicPartition): PartitionState = {
+    controllerContext.partitionState(partition)
   }
 
   /**
@@ -196,18 +184,20 @@ class PartitionStateMachine(config: KafkaConfig,
    * @param partitions  The partitions for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
-                           partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
+  private def doHandleStateChanges(partitions: Seq[TopicPartition],
+                                   targetState: PartitionState,
+                                   partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-    partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
-    val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
+    partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
+    val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
     invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
+
     targetState match {
       case NewPartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
             s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
-          changeStateTo(partition, partitionState(partition), NewPartition)
+          controllerContext.putPartitionState(partition, NewPartition)
         }
         Map.empty
       case OnlinePartition =>
@@ -218,7 +208,7 @@ class PartitionStateMachine(config: KafkaConfig,
           successfulInitializations.foreach { partition =>
             stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
               s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
-            changeStateTo(partition, partitionState(partition), OnlinePartition)
+            controllerContext.putPartitionState(partition, OnlinePartition)
           }
         }
         if (partitionsToElectLeader.nonEmpty) {
@@ -226,7 +216,7 @@ class PartitionStateMachine(config: KafkaConfig,
           successfulElections.foreach { partition =>
             stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
               s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
-            changeStateTo(partition, partitionState(partition), OnlinePartition)
+            controllerContext.putPartitionState(partition, OnlinePartition)
           }
           failedElections
         } else {
@@ -235,13 +225,13 @@ class PartitionStateMachine(config: KafkaConfig,
       case OfflinePartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
-          changeStateTo(partition, partitionState(partition), OfflinePartition)
+          controllerContext.putPartitionState(partition, OfflinePartition)
         }
         Map.empty
       case NonExistentPartition =>
         validPartitions.foreach { partition =>
           stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
-          changeStateTo(partition, partitionState(partition), NonExistentPartition)
+          controllerContext.putPartitionState(partition, NonExistentPartition)
         }
         Map.empty
     }
@@ -374,23 +364,24 @@ class PartitionStateMachine(config: KafkaConfig,
     if (validPartitionsForElection.isEmpty) {
       return (Seq.empty, Seq.empty, failedElections.toMap)
     }
-    val shuttingDownBrokers  = controllerContext.shuttingDownBrokerIds.toSet
     val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
       case OfflinePartitionLeaderElectionStrategy =>
-        leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validPartitionsForElection)
+        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
       case ReassignPartitionLeaderElectionStrategy =>
-        leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+        leaderForReassign(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
       case PreferredReplicaPartitionLeaderElectionStrategy =>
-        leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+        leaderForPreferredReplica(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
       case ControlledShutdownPartitionLeaderElectionStrategy =>
-        leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+        leaderForControlledShutdown(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
     }
-    partitionsWithoutLeaders.foreach { case (partition, _, _) =>
+    partitionsWithoutLeaders.foreach { electionResult =>
+      val partition = electionResult.topicPartition
       val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
       failedElections.put(partition, new StateChangeFailedException(failMsg))
     }
-    val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
-    val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
+    val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
+    val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
     successfulUpdates.foreach { case (partition, leaderAndIsr) =>
@@ -403,14 +394,14 @@ class PartitionStateMachine(config: KafkaConfig,
     (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)
   }
 
-  private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
+  private def collectUncleanLeaderElectionState(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)] = {
     val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) =>
       val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       liveInSyncReplicas.isEmpty
     }
     val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
-    val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+    partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
       if (failed.contains(partition.topic)) {
         logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
         (partition, None, false)
@@ -418,65 +409,8 @@ class PartitionStateMachine(config: KafkaConfig,
         (partition, Option(leaderIsrAndControllerEpoch), logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue())
       }
     } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false) }
-    partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
-      val assignment = controllerContext.partitionReplicaAssignment(partition)
-      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-      if (leaderIsrAndControllerEpochOpt.nonEmpty) {
-        val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
-        val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
-        val newLeaderAndIsrOpt = leaderOpt.map { leader =>
-          val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-          else List(leader)
-          leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
-        }
-        (partition, newLeaderAndIsrOpt, liveReplicas)
-      } else {
-        (partition, None, liveReplicas)
-      }
-    }
   }
 
-  private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
-      val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-      val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
-      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
-      (partition, newLeaderAndIsrOpt, reassignment)
-    }
-  }
-
-  private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val assignment = controllerContext.partitionReplicaAssignment(partition)
-      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-      val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
-      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
-      (partition, newLeaderAndIsrOpt, assignment)
-    }
-  }
-
-  private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
-  Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val assignment = controllerContext.partitionReplicaAssignment(partition)
-      val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true))
-      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-      val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers)
-      val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica))
-      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
-      (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
-    }
-  }
-
-  private def isValidTransition(partition: TopicPartition, targetState: PartitionState) =
-    targetState.validPreviousStates.contains(partitionState(partition))
-
   private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
     val currState = partitionState(partition)
     val e = new IllegalStateException(s"Partition $partition should be in one of " +
@@ -501,7 +435,7 @@ object PartitionLeaderElectionAlgorithms {
     assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
       if (uncleanLeaderElectionEnabled) {
         val leaderOpt = assignment.find(liveReplicas.contains)
-        if (!leaderOpt.isEmpty)
+        if (leaderOpt.isDefined)
           controllerContext.stats.uncleanLeaderElectionRate.mark()
         leaderOpt
       } else {
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 433ab56..f7ec470 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -28,35 +28,7 @@ import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
 
-/**
- * This class represents the state machine for replicas. It defines the states that a replica can be in, and
- * transitions to move the replica to another legal state. The different states that a replica can be in are -
- * 1. NewReplica        : The controller can create new replicas during partition reassignment. In this state, a
- *                        replica can only get become follower state change request.  Valid previous
- *                        state is NonExistentReplica
- * 2. OnlineReplica     : Once a replica is started and part of the assigned replicas for its partition, it is in this
- *                        state. In this state, it can get either become leader or become follower state change requests.
- *                        Valid previous state are NewReplica, OnlineReplica or OfflineReplica
- * 3. OfflineReplica    : If a replica dies, it moves to this state. This happens when the broker hosting the replica
- *                        is down. Valid previous state are NewReplica, OnlineReplica
- * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
- * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
- *                        moved to this state. Valid previous state is ReplicaDeletionStarted
- * 6. ReplicaDeletionIneligible: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
- * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
- *                        ReplicaDeletionSuccessful
- */
-class ReplicaStateMachine(config: KafkaConfig,
-                          stateChangeLogger: StateChangeLogger,
-                          controllerContext: ControllerContext,
-                          topicDeletionManager: TopicDeletionManager,
-                          zkClient: KafkaZkClient,
-                          replicaState: mutable.Map[PartitionAndReplica, ReplicaState],
-                          controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
-  private val controllerId = config.brokerId
-
-  this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "
-
+abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
   /**
    * Invoked on successful controller election.
    */
@@ -65,14 +37,13 @@ class ReplicaStateMachine(config: KafkaConfig,
     initializeReplicaState()
     info("Triggering online replica state changes")
     handleStateChanges(controllerContext.allLiveReplicas().toSeq, OnlineReplica)
-    info(s"Started replica state machine with initial state -> $replicaState")
+    debug(s"Started replica state machine with initial state -> ${controllerContext.replicaStates}")
   }
 
   /**
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    replicaState.clear()
     info("Stopped replica state machine")
   }
 
@@ -85,25 +56,56 @@ class ReplicaStateMachine(config: KafkaConfig,
       val replicas = controllerContext.partitionReplicaAssignment(partition)
       replicas.foreach { replicaId =>
         val partitionAndReplica = PartitionAndReplica(partition, replicaId)
-        if (controllerContext.isReplicaOnline(replicaId, partition))
-          replicaState.put(partitionAndReplica, OnlineReplica)
-        else
-        // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
-        // This is required during controller failover since during controller failover a broker can go down,
-        // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
-          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
+        if (controllerContext.isReplicaOnline(replicaId, partition)) {
+          controllerContext.putReplicaState(partitionAndReplica, OnlineReplica)
+        } else {
+          // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
+          // This is required during controller failover since during controller failover a broker can go down,
+          // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
+          controllerContext.putReplicaState(partitionAndReplica, ReplicaDeletionIneligible)
+        }
       }
     }
   }
 
-  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,
-                         callbacks: Callbacks = new Callbacks()): Unit = {
+  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit
+}
+
+/**
+ * This class represents the state machine for replicas. It defines the states that a replica can be in, and
+ * transitions to move the replica to another legal state. The different states that a replica can be in are -
+ * 1. NewReplica        : The controller can create new replicas during partition reassignment. In this state, a
+ *                        replica can only get become follower state change request.  Valid previous
+ *                        state is NonExistentReplica
+ * 2. OnlineReplica     : Once a replica is started and part of the assigned replicas for its partition, it is in this
+ *                        state. In this state, it can get either become leader or become follower state change requests.
+ *                        Valid previous state are NewReplica, OnlineReplica or OfflineReplica
+ * 3. OfflineReplica    : If a replica dies, it moves to this state. This happens when the broker hosting the replica
+ *                        is down. Valid previous state are NewReplica, OnlineReplica
+ * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
+ * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
+ *                        moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 6. ReplicaDeletionIneligible: If replica deletion fails, it is moved to this state. Valid previous states are
+ *                        ReplicaDeletionStarted and OfflineReplica
+ * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
+ *                        ReplicaDeletionSuccessful
+ */
+class ZkReplicaStateMachine(config: KafkaConfig,
+                            stateChangeLogger: StateChangeLogger,
+                            controllerContext: ControllerContext,
+                            zkClient: KafkaZkClient,
+                            controllerBrokerRequestBatch: ControllerBrokerRequestBatch)
+  extends ReplicaStateMachine(controllerContext) with Logging {
+
+  private val controllerId = config.brokerId
+  this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "
+
+  override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
     if (replicas.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
-        replicas.groupBy(_.replica).map { case (replicaId, replicas) =>
-          val partitions = replicas.map(_.topicPartition)
-          doHandleStateChanges(replicaId, partitions, targetState, callbacks)
+        replicas.groupBy(_.replica).foreach { case (replicaId, replicas) =>
+          doHandleStateChanges(replicaId, replicas, targetState)
         }
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
@@ -150,39 +152,42 @@ class ReplicaStateMachine(config: KafkaConfig,
    * @param partitions The partitions on this replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,
-                                   callbacks: Callbacks): Unit = {
-    val replicas = partitions.map(partition => PartitionAndReplica(partition, replicaId))
-    replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica))
-    val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState))
+  private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
+    replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
+    val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
     invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
+
     targetState match {
       case NewReplica =>
         validReplicas.foreach { replica =>
           val partition = replica.topicPartition
+          val currentState = controllerContext.replicaState(replica)
+
           controllerContext.partitionLeadershipInfo.get(partition) match {
             case Some(leaderIsrAndControllerEpoch) =>
               if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
                 val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
-                logFailedStateChange(replica, replicaState(replica), OfflineReplica, exception)
+                logFailedStateChange(replica, currentState, OfflineReplica, exception)
               } else {
                 controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                   replica.topicPartition,
                   leaderIsrAndControllerEpoch,
                   controllerContext.partitionReplicaAssignment(replica.topicPartition),
                   isNew = true)
-                logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
-                replicaState.put(replica, NewReplica)
+                logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
+                controllerContext.putReplicaState(replica, NewReplica)
               }
             case None =>
-              logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
-              replicaState.put(replica, NewReplica)
+              logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
+              controllerContext.putReplicaState(replica, NewReplica)
           }
         }
       case OnlineReplica =>
         validReplicas.foreach { replica =>
           val partition = replica.topicPartition
-          replicaState(replica) match {
+          val currentState = controllerContext.replicaState(replica)
+
+          currentState match {
             case NewReplica =>
               val assignment = controllerContext.partitionReplicaAssignment(partition)
               if (!assignment.contains(replicaId)) {
@@ -198,20 +203,19 @@ class ReplicaStateMachine(config: KafkaConfig,
                 case None =>
               }
           }
-          logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica)
-          replicaState.put(replica, OnlineReplica)
+          logSuccessfulTransition(replicaId, partition, currentState, OnlineReplica)
+          controllerContext.putReplicaState(replica, OnlineReplica)
         }
       case OfflineReplica =>
         validReplicas.foreach { replica =>
-          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,
-            deletePartition = false, (_, _) => ())
+          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
         }
         val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
           controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
         }
         val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
-          if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
+          if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
             val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
             controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
               partition,
@@ -219,39 +223,43 @@ class ReplicaStateMachine(config: KafkaConfig,
               controllerContext.partitionReplicaAssignment(partition), isNew = false)
           }
           val replica = PartitionAndReplica(partition, replicaId)
-          logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
-          replicaState.put(replica, OfflineReplica)
+          val currentState = controllerContext.replicaState(replica)
+          logSuccessfulTransition(replicaId, partition, currentState, OfflineReplica)
+          controllerContext.putReplicaState(replica, OfflineReplica)
         }
 
         replicasWithoutLeadershipInfo.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), OfflineReplica)
-          replicaState.put(replica, OfflineReplica)
+          val currentState = controllerContext.replicaState(replica)
+          logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
+          controllerContext.putReplicaState(replica, OfflineReplica)
         }
       case ReplicaDeletionStarted =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted)
-          replicaState.put(replica, ReplicaDeletionStarted)
-          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId),
-            replica.topicPartition,
-            deletePartition = true,
-            callbacks.stopReplicaResponseCallback)
+          val currentState = controllerContext.replicaState(replica)
+          logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
+          controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
+          val topicDeletionInProgress = controllerContext.isTopicDeletionInProgress(replica.topicPartition.topic)
+          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
         }
       case ReplicaDeletionIneligible =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionIneligible)
-          replicaState.put(replica, ReplicaDeletionIneligible)
+          val currentState = controllerContext.replicaState(replica)
+          logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
+          controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
         }
       case ReplicaDeletionSuccessful =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionSuccessful)
-          replicaState.put(replica, ReplicaDeletionSuccessful)
+          val currentState = controllerContext.replicaState(replica)
+          logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
+          controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
         }
       case NonExistentReplica =>
         validReplicas.foreach { replica =>
+          val currentState = controllerContext.replicaState(replica)
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
           controllerContext.updatePartitionReplicaAssignment(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
-          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica)
-          replicaState.remove(replica)
+          logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica)
+          controllerContext.removeReplicaState(replica)
         }
     }
   }
@@ -273,7 +281,8 @@ class ReplicaStateMachine(config: KafkaConfig,
       remaining = removalsToRetry
       failedRemovals.foreach { case (partition, e) =>
         val replica = PartitionAndReplica(partition, replicaId)
-        logFailedStateChange(replica, replicaState(replica), OfflineReplica, e)
+        val currentState = controllerContext.replicaState(replica)
+        logFailedStateChange(replica, currentState, OfflineReplica, e)
       }
     }
     results
@@ -305,7 +314,7 @@ class ReplicaStateMachine(config: KafkaConfig,
     val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
-      if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
+      if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
         val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty")
         Option(partition -> exception)
       } else None
@@ -367,32 +376,13 @@ class ReplicaStateMachine(config: KafkaConfig,
     (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
   }
 
-  def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
-    controllerContext.replicasForTopic(topic).exists(replica => replicaState(replica) == ReplicaDeletionStarted)
-  }
-
-  def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
-    replicaState.filter { case (replica, s) => replica.topic.equals(topic) && s == state }.keySet.toSet
-  }
-
-  def areAllReplicasForTopicDeleted(topic: String): Boolean = {
-    controllerContext.replicasForTopic(topic).forall(replica => replicaState(replica) == ReplicaDeletionSuccessful)
-  }
-
-  def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
-    replicaState.exists { case (replica, s) => replica.topic.equals(topic) && s == state}
-  }
-
-  private def isValidTransition(replica: PartitionAndReplica, targetState: ReplicaState) =
-    targetState.validPreviousStates.contains(replicaState(replica))
-
   private def logSuccessfulTransition(replicaId: Int, partition: TopicPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
     stateChangeLogger.withControllerEpoch(controllerContext.epoch)
       .trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
   }
 
   private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {
-    val currState = replicaState(replica)
+    val currState = controllerContext.replicaState(replica)
     val e = new IllegalStateException(s"Replica $replica should be in the ${targetState.validPreviousStates.mkString(",")} " +
       s"states before moving to $targetState state. Instead it is in $currState state")
     logFailedStateChange(replica, currState, targetState, e)
@@ -437,7 +427,7 @@ case object ReplicaDeletionSuccessful extends ReplicaState {
 
 case object ReplicaDeletionIneligible extends ReplicaState {
   val state: Byte = 6
-  val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted)
+  val validPreviousStates: Set[ReplicaState] = Set(OfflineReplica, ReplicaDeletionStarted)
 }
 
 case object NonExistentReplica extends ReplicaState {
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 1ef79be..0f56e3a 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -16,11 +16,39 @@
  */
 package kafka.controller
 
+import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 
-import scala.collection.{Set, mutable}
+import scala.collection.Set
+
+trait DeletionClient {
+  def deleteTopic(topic: String, epochZkVersion: Int): Unit
+  def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit
+  def mutePartitionModifications(topic: String): Unit
+  def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit
+}
+
+class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
+  override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
+    zkClient.deleteTopicZNode(topic, epochZkVersion)
+    zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
+    zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
+  }
+
+  override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
+    zkClient.deleteTopicDeletions(topics, epochZkVersion)
+  }
+
+  override def mutePartitionModifications(topic: String): Unit = {
+    controller.unregisterPartitionModificationsHandlers(Seq(topic))
+  }
+
+  override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
+    controller.sendUpdateMetadataRequest(controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
+  }
+}
 
 /**
  * This manages the state machine for topic deletion.
@@ -55,44 +83,22 @@ import scala.collection.{Set, mutable}
  *    it marks the topic for deletion retry.
  * @param controller
  */
-class TopicDeletionManager(controller: KafkaController,
-                           eventManager: ControllerEventManager,
-                           zkClient: KafkaZkClient) extends Logging {
-  this.logIdent = s"[Topic Deletion Manager ${controller.config.brokerId}], "
-  val controllerContext = controller.controllerContext
-  val isDeleteTopicEnabled = controller.config.deleteTopicEnable
-  val topicsToBeDeleted = mutable.Set.empty[String]
-  /** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric.
-    * When a topic is going through deletion, we don't want to keep track of its partition state
-    * changes in the offlinePartitionCount metric, see the PartitionStateMachine#updateControllerMetrics
-    * for detailed logic. This goal means if some partitions of a topic are already
-    * in OfflinePartition state when deletion starts, we need to change the corresponding partition
-    * states to NonExistentPartition first before starting the deletion.
-    *
-    * However we can NOT change partition states to NonExistentPartition at the time of enqueuing topics
-    * for deletion. The reason is that when a topic is enqueued for deletion, it may be ineligible for
-    * deletion due to ongoing partition reassignments. Hence there might be a delay between enqueuing
-    * a topic for deletion and the actual start of deletion. In this delayed interval, partitions may still
-    * transition to or out of the OfflinePartition state.
-    *
-    * Hence we decide to change partition states to NonExistentPartition only when the actual deletion have started.
-    * For topics whose deletion have actually started, we keep track of them in the following topicsWithDeletionStarted
-    * variable. And once a topic is in the topicsWithDeletionStarted set, we are sure there will no longer
-    * be partition reassignments to any of its partitions, and only then it's safe to move its partitions to
-    * NonExistentPartition state. Once a topic is in the topicsWithDeletionStarted set, we will stop monitoring
-    * its partition state changes in the offlinePartitionCount metric
-    */
-  val topicsWithDeletionStarted = mutable.Set.empty[String]
-  val topicsIneligibleForDeletion = mutable.Set.empty[String]
+class TopicDeletionManager(config: KafkaConfig,
+                           controllerContext: ControllerContext,
+                           replicaStateMachine: ReplicaStateMachine,
+                           partitionStateMachine: PartitionStateMachine,
+                           client: DeletionClient) extends Logging {
+  this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
+  val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
 
   def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
     if (isDeleteTopicEnabled) {
-      topicsToBeDeleted ++= initialTopicsToBeDeleted
-      topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & topicsToBeDeleted
+      controllerContext.topicsToBeDeleted ++= initialTopicsToBeDeleted
+      controllerContext.topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & controllerContext.topicsToBeDeleted
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
       info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled")
-      zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
+      client.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
     }
   }
 
@@ -103,17 +109,6 @@ class TopicDeletionManager(controller: KafkaController,
   }
 
   /**
-   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
-   */
-  def reset() {
-    if (isDeleteTopicEnabled) {
-      topicsToBeDeleted.clear()
-      topicsWithDeletionStarted.clear()
-      topicsIneligibleForDeletion.clear()
-    }
-  }
-
-  /**
    * Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added
    * to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully
    * i.e. all replicas of all partitions of that topic are deleted successfully.
@@ -121,7 +116,7 @@ class TopicDeletionManager(controller: KafkaController,
    */
   def enqueueTopicsForDeletion(topics: Set[String]) {
     if (isDeleteTopicEnabled) {
-      topicsToBeDeleted ++= topics
+      controllerContext.topicsToBeDeleted ++= topics
       resumeDeletions()
     }
   }
@@ -134,9 +129,9 @@ class TopicDeletionManager(controller: KafkaController,
    */
   def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
     if (isDeleteTopicEnabled) {
-      val topicsToResumeDeletion = topics & topicsToBeDeleted
+      val topicsToResumeDeletion = topics & controllerContext.topicsToBeDeleted
       if (topicsToResumeDeletion.nonEmpty) {
-        topicsIneligibleForDeletion --= topicsToResumeDeletion
+        controllerContext.topicsIneligibleForDeletion --= topicsToResumeDeletion
         resumeDeletions()
       }
     }
@@ -155,7 +150,7 @@ class TopicDeletionManager(controller: KafkaController,
       if (replicasThatFailedToDelete.nonEmpty) {
         val topics = replicasThatFailedToDelete.map(_.topic)
         debug(s"Deletion failed for replicas ${replicasThatFailedToDelete.mkString(",")}. Halting deletion for topics $topics")
-        controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
+        replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
         markTopicIneligibleForDeletion(topics)
         resumeDeletions()
       }
@@ -168,10 +163,10 @@ class TopicDeletionManager(controller: KafkaController,
    * 2. partition reassignment in progress for some partitions of the topic
    * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
    */
-  def markTopicIneligibleForDeletion(topics: Set[String]) {
+  def markTopicIneligibleForDeletion(topics: Set[String]): Unit = {
     if (isDeleteTopicEnabled) {
-      val newTopicsToHaltDeletion = topicsToBeDeleted & topics
-      topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
+      val newTopicsToHaltDeletion = controllerContext.topicsToBeDeleted & topics
+      controllerContext.topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
       if (newTopicsToHaltDeletion.nonEmpty)
         info(s"Halted deletion of topics ${newTopicsToHaltDeletion.mkString(",")}")
     }
@@ -179,28 +174,21 @@ class TopicDeletionManager(controller: KafkaController,
 
   private def isTopicIneligibleForDeletion(topic: String): Boolean = {
     if (isDeleteTopicEnabled) {
-      topicsIneligibleForDeletion.contains(topic)
+      controllerContext.topicsIneligibleForDeletion.contains(topic)
     } else
       true
   }
 
   private def isTopicDeletionInProgress(topic: String): Boolean = {
     if (isDeleteTopicEnabled) {
-      controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
-    } else
-      false
-  }
-
-  def isTopicWithDeletionStarted(topic: String) = {
-    if (isDeleteTopicEnabled) {
-      topicsWithDeletionStarted.contains(topic)
+      controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)
     } else
       false
   }
 
   def isTopicQueuedUpForDeletion(topic: String): Boolean = {
     if (isDeleteTopicEnabled) {
-      topicsToBeDeleted.contains(topic)
+      controllerContext.isTopicQueuedUpForDeletion(topic)
     } else
       false
   }
@@ -214,7 +202,7 @@ class TopicDeletionManager(controller: KafkaController,
   def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
     val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
     debug(s"Deletion successfully completed for replicas ${successfullyDeletedReplicas.mkString(",")}")
-    controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas.toSeq, ReplicaDeletionSuccessful)
+    replicaStateMachine.handleStateChanges(successfullyDeletedReplicas.toSeq, ReplicaDeletionSuccessful)
     resumeDeletions()
   }
 
@@ -227,7 +215,9 @@ class TopicDeletionManager(controller: KafkaController,
    * @return Whether or not deletion can be retried for the topic
    */
   private def isTopicEligibleForDeletion(topic: String): Boolean = {
-    topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicIneligibleForDeletion(topic))
+    controllerContext.isTopicQueuedUpForDeletion(topic) &&
+      !isTopicDeletionInProgress(topic) &&
+      !isTopicIneligibleForDeletion(topic)
   }
 
   /**
@@ -235,25 +225,23 @@ class TopicDeletionManager(controller: KafkaController,
    * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
    *@param topic Topic for which deletion should be retried
    */
-  private def markTopicForDeletionRetry(topic: String) {
+  private def retryDeletionForIneligibleReplicas(topic: String): Unit = {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
-    val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
+    val failedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionIneligible)
     info(s"Retrying delete topic for topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
-    controller.replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
+    replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
   }
 
   private def completeDeleteTopic(topic: String) {
     // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
     // firing before the new topic listener when a deleted topic gets auto created
-    controller.unregisterPartitionModificationsHandlers(Seq(topic))
-    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+    client.mutePartitionModifications(topic)
+    val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
     // controller will remove this replica from the state machine as well as its partition assignment cache
-    controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
-    topicsToBeDeleted -= topic
-    topicsWithDeletionStarted -= topic
-    zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion)
-    zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion)
-    zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion)
+    replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
+    controllerContext.topicsToBeDeleted -= topic
+    controllerContext.topicsWithDeletionStarted -= topic
+    client.deleteTopic(topic, controllerContext.epochZkVersion)
     controllerContext.removeTopic(topic)
   }
 
@@ -268,17 +256,17 @@ class TopicDeletionManager(controller: KafkaController,
     info(s"Topic deletion callback for ${topics.mkString(",")}")
     // send update metadata so that brokers stop serving data for topics to be deleted
     val partitions = topics.flatMap(controllerContext.partitionsForTopic)
-    val unseenTopicsForDeletion = topics -- topicsWithDeletionStarted
+    val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted
     if (unseenTopicsForDeletion.nonEmpty) {
       val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
-      controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
-      controller.partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, NonExistentPartition)
-      // adding of unseenTopicsForDeletion to topicsBeingDeleted must be done after the partition state changes
-      // to make sure the offlinePartitionCount metric is properly updated
-      topicsWithDeletionStarted ++= unseenTopicsForDeletion
+      partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
+      partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, NonExistentPartition)
+      // adding of unseenTopicsForDeletion to topics with deletion started must be done after the partition
+      // state changes to make sure the offlinePartitionCount metric is properly updated
+      controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
     }
 
-    controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
+    client.sendMetadataUpdate(partitions)
     topics.foreach { topic =>
       onPartitionDeletion(controllerContext.partitionsForTopic(topic))
     }
@@ -298,22 +286,20 @@ class TopicDeletionManager(controller: KafkaController,
    * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
    *    for deletion if some replicas are dead since it won't complete successfully anyway
    * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
-   *@param replicasForTopicsToBeDeleted
+   * @param replicasForTopicsToBeDeleted
    */
   private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
     replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
       val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
       val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
-      val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+      val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
       // move dead replicas directly to failed state
-      controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible, new Callbacks())
+      replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
       // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
-      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica, new Callbacks())
+      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
       debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}")
-      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
-        new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, replicaId) =>
-          eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))))
+      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted)
       if (deadReplicasForTopic.nonEmpty) {
         debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
         markTopicIneligibleForDeletion(Set(topic))
@@ -339,34 +325,31 @@ class TopicDeletionManager(controller: KafkaController,
   }
 
   private def resumeDeletions(): Unit = {
-    val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
-
+    val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
     if (topicsQueuedForDeletion.nonEmpty)
       info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
 
     topicsQueuedForDeletion.foreach { topic =>
       // if all replicas are marked as deleted successfully, then topic deletion is done
-      if (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
+      if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
         // clear up all state for this topic from controller cache and zookeeper
         completeDeleteTopic(topic)
         info(s"Deletion of topic $topic successfully completed")
+      } else if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
+        // ignore since topic deletion is in progress
+        val replicasInDeletionStartedState = controllerContext.replicasInState(topic, ReplicaDeletionStarted)
+        val replicaIds = replicasInDeletionStartedState.map(_.replica)
+        val partitions = replicasInDeletionStartedState.map(_.topicPartition)
+        info(s"Deletion for replicas ${replicaIds.mkString(",")} for partition ${partitions.mkString(",")} of topic $topic in progress")
       } else {
-        if (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
-          // ignore since topic deletion is in progress
-          val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
-          val replicaIds = replicasInDeletionStartedState.map(_.replica)
-          val partitions = replicasInDeletionStartedState.map(_.topicPartition)
-          info(s"Deletion for replicas ${replicaIds.mkString(",")} for partition ${partitions.mkString(",")} of topic $topic in progress")
-        } else {
-          // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
-          // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
-          // or there is at least one failed replica (which means topic deletion should be retried).
-          if (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
-            // mark topic for deletion retry
-            markTopicForDeletionRetry(topic)
-          }
+        // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
+        // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
+        // or there is at least one failed replica (which means topic deletion should be retried).
+        if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
+          retryDeletionForIneligibleReplicas(topic)
         }
       }
+
       // Try delete topic if it is eligible for deletion.
       if (isTopicEligibleForDeletion(topic)) {
         info(s"Deletion of topic $topic (re)started")
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 400920e..7a5b3e5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -201,8 +201,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val (controller, controllerId) = getController()
     val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment)
     TestUtils.waitUntilTrue(() => {
-      val replicasInDeletionSuccessful = controller.kafkaController.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
-      val offlineReplicas = controller.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+      val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
+      val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica)
       allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas)
     }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica")
 
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 6cfa72c..283858c 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -62,7 +62,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     createTopic(topic, 1, 1)
     val topicPartition = new TopicPartition("topic1", 0)
     TestUtils.waitUntilTrue(() =>
-      initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
+      initialController.controllerContext.partitionsInState(OnlinePartition).contains(topicPartition),
       s"Partition $topicPartition did not transition to online state")
 
     // Wait until we have verified that we have resigned
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
new file mode 100644
index 0000000..2578199
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import kafka.common.StateChangeFailedException
+import kafka.controller.Election._
+import org.apache.kafka.common.TopicPartition
+
+import scala.collection.mutable
+
+class MockPartitionStateMachine(controllerContext: ControllerContext,
+                                uncleanLeaderElectionEnabled: Boolean)
+  extends PartitionStateMachine(controllerContext) {
+
+  override def handleStateChanges(partitions: Seq[TopicPartition],
+                                  targetState: PartitionState,
+                                  leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
+    partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
+    val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
+    if (invalidPartitions.nonEmpty) {
+      val currentStates = invalidPartitions.map(p => controllerContext.partitionStates.get(p))
+      throw new IllegalStateException(s"Invalid state transition to $targetState for partitions $currentStates")
+    }
+
+    if (targetState == OnlinePartition) {
+      val uninitializedPartitions = validPartitions.filter(partition => controllerContext.partitionState(partition) == NewPartition)
+      val partitionsToElectLeader = partitions.filter { partition =>
+        val currentState = controllerContext.partitionState(partition)
+        currentState == OfflinePartition || currentState == OnlinePartition
+      }
+
+      uninitializedPartitions.foreach { partition =>
+        controllerContext.putPartitionState(partition, targetState)
+      }
+
+      val failedElections = doLeaderElections(partitionsToElectLeader, leaderElectionStrategy.get)
+      val successfulElections = partitionsToElectLeader.filterNot(failedElections.keySet.contains)
+      successfulElections.foreach { partition =>
+        controllerContext.putPartitionState(partition, targetState)
+      }
+
+      failedElections
+    } else {
+      validPartitions.foreach { partition =>
+        controllerContext.putPartitionState(partition, targetState)
+      }
+      Map.empty
+    }
+  }
+
+  private def doLeaderElections(partitions: Seq[TopicPartition],
+                                leaderElectionStrategy: PartitionLeaderElectionStrategy): Map[TopicPartition, Throwable] = {
+    val failedElections = mutable.Map.empty[TopicPartition, Exception]
+    val leaderIsrAndControllerEpochPerPartition = partitions.map { partition =>
+      partition -> controllerContext.partitionLeadershipInfo(partition)
+    }
+
+    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
+      leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
+    }
+    invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+      val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
+        s"already written by another controller. This probably means that the current controller went through " +
+        s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+      failedElections.put(partition, new StateChangeFailedException(failMsg))
+    }
+
+    val electionResults = leaderElectionStrategy match {
+      case OfflinePartitionLeaderElectionStrategy =>
+        val partitionsWithUncleanLeaderElectionState = validPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
+          (partition, Some(leaderIsrAndControllerEpoch), uncleanLeaderElectionEnabled)
+        }
+        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState)
+      case ReassignPartitionLeaderElectionStrategy =>
+        leaderForReassign(controllerContext, validPartitionsForElection)
+      case PreferredReplicaPartitionLeaderElectionStrategy =>
+        leaderForPreferredReplica(controllerContext, validPartitionsForElection)
+      case ControlledShutdownPartitionLeaderElectionStrategy =>
+        leaderForControlledShutdown(controllerContext, validPartitionsForElection)
+    }
+
+    for (electionResult <- electionResults) {
+      val partition = electionResult.topicPartition
+      electionResult.leaderAndIsr match {
+        case None =>
+          val failMsg = s"Failed to elect leader for partition $partition under strategy $leaderElectionStrategy"
+          failedElections.put(partition, new StateChangeFailedException(failMsg))
+        case Some(leaderAndIsr) =>
+          val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+          controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+      }
+    }
+    failedElections.toMap
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
new file mode 100644
index 0000000..248a5de
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) {
+
+  override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
+    replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
+    val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
+    if (invalidReplicas.nonEmpty) {
+      val currentStates = invalidReplicas.map(replica => replica -> controllerContext.replicaStates.get(replica)).toMap
+      throw new IllegalStateException(s"Invalid state transition to $targetState for replicas $currentStates")
+    }
+    validReplicas.foreach { replica =>
+      if (targetState == NonExistentReplica)
+        controllerContext.removeReplicaState(replica)
+      else
+        controllerContext.putReplicaState(replica, targetState)
+    }
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index d711ae0..ba90231 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -29,16 +29,13 @@ import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
+import org.mockito.Mockito
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection.mutable
-
 class PartitionStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
   private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
-  private var mockTopicDeletionManager: TopicDeletionManager = null
-  private var partitionState: mutable.Map[TopicPartition, PartitionState] = null
   private var partitionStateMachine: PartitionStateMachine = null
 
   private val brokerId = 5
@@ -53,11 +50,12 @@ class PartitionStateMachineTest extends JUnitSuite {
     controllerContext.epoch = controllerEpoch
     mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
-    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
-    partitionState = mutable.Map.empty[TopicPartition, PartitionState]
-    partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext,
-      mockZkClient, partitionState, mockControllerBrokerRequestBatch)
-    partitionStateMachine.setTopicDeletionManager(mockTopicDeletionManager)
+    partitionStateMachine = new ZkPartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext,
+      mockZkClient, mockControllerBrokerRequestBatch)
+  }
+
+  private def partitionState(partition: TopicPartition): PartitionState = {
+    controllerContext.partitionState(partition)
   }
 
   @Test
@@ -82,7 +80,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testNewPartitionToOnlinePartitionTransition(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, NewPartition)
+    controllerContext.putPartitionState(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
@@ -100,7 +98,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, NewPartition)
+    controllerContext.putPartitionState(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
@@ -116,7 +114,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, NewPartition)
+    controllerContext.putPartitionState(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion))
@@ -130,14 +128,14 @@ class PartitionStateMachineTest extends JUnitSuite {
 
   @Test
   def testNewPartitionToOfflinePartitionTransition(): Unit = {
-    partitionState.put(partition, NewPartition)
+    controllerContext.putPartitionState(partition, NewPartition)
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 
   @Test
   def testInvalidNewPartitionToNonexistentPartitionTransition(): Unit = {
-    partitionState.put(partition, NewPartition)
+    controllerContext.putPartitionState(partition, NewPartition)
     partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
     assertEquals(NewPartition, partitionState(partition))
   }
@@ -146,7 +144,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testOnlinePartitionToOnlineTransition(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, OnlinePartition)
+    controllerContext.putPartitionState(partition, OnlinePartition)
     val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -179,7 +177,7 @@ class PartitionStateMachineTest extends JUnitSuite {
       TestUtils.createBrokerAndEpoch(otherBrokerId, "host", 0)))
     controllerContext.shuttingDownBrokerIds.add(brokerId)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId, otherBrokerId))
-    partitionState.put(partition, OnlinePartition)
+    controllerContext.putPartitionState(partition, OnlinePartition)
     val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -209,21 +207,21 @@ class PartitionStateMachineTest extends JUnitSuite {
 
   @Test
   def testOnlinePartitionToOfflineTransition(): Unit = {
-    partitionState.put(partition, OnlinePartition)
+    controllerContext.putPartitionState(partition, OnlinePartition)
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 
   @Test
   def testInvalidOnlinePartitionToNonexistentPartitionTransition(): Unit = {
-    partitionState.put(partition, OnlinePartition)
+    controllerContext.putPartitionState(partition, OnlinePartition)
     partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
   @Test
   def testInvalidOnlinePartitionToNewPartitionTransition(): Unit = {
-    partitionState.put(partition, OnlinePartition)
+    controllerContext.putPartitionState(partition, OnlinePartition)
     partitionStateMachine.handleStateChanges(partitions, NewPartition)
     assertEquals(OnlinePartition, partitionState(partition))
   }
@@ -232,7 +230,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testOfflinePartitionToOnlinePartitionTransition(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, OfflinePartition)
+    controllerContext.putPartitionState(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -263,7 +261,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, OfflinePartition)
+    controllerContext.putPartitionState(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -284,7 +282,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
-    partitionState.put(partition, OfflinePartition)
+    controllerContext.putPartitionState(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -305,14 +303,14 @@ class PartitionStateMachineTest extends JUnitSuite {
 
   @Test
   def testOfflinePartitionToNonexistentPartitionTransition(): Unit = {
-    partitionState.put(partition, OfflinePartition)
+    controllerContext.putPartitionState(partition, OfflinePartition)
     partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
     assertEquals(NonExistentPartition, partitionState(partition))
   }
 
   @Test
   def testInvalidOfflinePartitionToNewPartitionTransition(): Unit = {
-    partitionState.put(partition, OfflinePartition)
+    controllerContext.putPartitionState(partition, OfflinePartition)
     partitionStateMachine.handleStateChanges(partitions, NewPartition)
     assertEquals(OfflinePartition, partitionState(partition))
   }
@@ -356,23 +354,21 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val partitionIds = Seq(0, 1, 2, 3)
     val topic = "test"
-    val partitions = partitionIds.map(new TopicPartition("test", _))
+    val partitions = partitionIds.map(new TopicPartition(topic, _))
 
     partitions.foreach { partition =>
       controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     }
 
-    EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(false)
-    EasyMock.expectLastCall().anyTimes()
     prepareMockToElectLeaderForPartitions(partitions)
-    EasyMock.replay(mockZkClient, mockTopicDeletionManager)
+    EasyMock.replay(mockZkClient)
 
     partitionStateMachine.handleStateChanges(partitions, NewPartition)
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
-    assertEquals(s"There should be ${partitions.size} offline partition(s)", partitions.size, partitionStateMachine.offlinePartitionCount)
+    assertEquals(s"There should be ${partitions.size} offline partition(s)", partitions.size, controllerContext.offlinePartitionCount)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
-    assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount)
+    assertEquals(s"There should be no offline partition(s)", 0, controllerContext.offlinePartitionCount)
   }
 
   /**
@@ -383,15 +379,14 @@ class PartitionStateMachineTest extends JUnitSuite {
   def testNoOfflinePartitionsChangeForTopicsBeingDeleted() = {
     val partitionIds = Seq(0, 1, 2, 3)
     val topic = "test"
-    val partitions = partitionIds.map(new TopicPartition("test", _))
+    val partitions = partitionIds.map(new TopicPartition(topic, _))
 
-    EasyMock.expect(mockTopicDeletionManager.isTopicWithDeletionStarted(topic)).andReturn(true)
-    EasyMock.expectLastCall().anyTimes()
-    EasyMock.replay(mockTopicDeletionManager)
+    controllerContext.topicsToBeDeleted.add(topic)
+    controllerContext.topicsWithDeletionStarted.add(topic)
 
     partitionStateMachine.handleStateChanges(partitions, NewPartition)
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
-    assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount)
+    assertEquals(s"There should be no offline partition(s)", 0, controllerContext.offlinePartitionCount)
   }
 
   /**
@@ -411,52 +406,21 @@ class PartitionStateMachineTest extends JUnitSuite {
       controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     }
 
-    val props = TestUtils.createBrokerConfig(brokerId, "zkConnect")
-    props.put(KafkaConfig.DeleteTopicEnableProp, "true")
-
-    val customConfig = KafkaConfig.fromProps(props)
-
-    def createMockReplicaStateMachine() = {
-      val replicaStateMachine: ReplicaStateMachine = EasyMock.createMock(classOf[ReplicaStateMachine])
-      EasyMock.expect(replicaStateMachine.areAllReplicasForTopicDeleted(topic)).andReturn(false).anyTimes()
-      EasyMock.expect(replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)).andReturn(false).anyTimes()
-      EasyMock.expect(replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)).andReturn(false).anyTimes()
-      EasyMock.expect(replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)).andReturn(Set.empty).anyTimes()
-      EasyMock.expect(replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)).andReturn(Set.empty).anyTimes()
-      EasyMock.expect(replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)).andReturn(Set.empty).anyTimes()
-      EasyMock.expect(replicaStateMachine.handleStateChanges(EasyMock.anyObject[Seq[PartitionAndReplica]],
-        EasyMock.anyObject[ReplicaState], EasyMock.anyObject[Callbacks]))
-
-      EasyMock.expectLastCall().anyTimes()
-      replicaStateMachine
-    }
-    val replicaStateMachine = createMockReplicaStateMachine()
-    partitionStateMachine = new PartitionStateMachine(customConfig, new StateChangeLogger(brokerId, true, None), controllerContext,
-      mockZkClient, partitionState, mockControllerBrokerRequestBatch)
-
-    def createMockController() = {
-      val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController])
-      EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
-      EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
-      EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
-      EasyMock.expect(mockController.replicaStateMachine).andReturn(replicaStateMachine).anyTimes()
-      EasyMock.expect(mockController.sendUpdateMetadataRequest(Seq.empty, partitions.toSet))
-      EasyMock.expectLastCall().anyTimes()
-      mockController
-    }
-
-    val mockController = createMockController()
-    val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager])
-    EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
-
-    val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient)
-    partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
+    val partitionStateMachine = new MockPartitionStateMachine(controllerContext, uncleanLeaderElectionEnabled = false)
+    val replicaStateMachine = new MockReplicaStateMachine(controllerContext)
+    val deletionClient = Mockito.mock(classOf[DeletionClient])
+    val topicDeletionManager = new TopicDeletionManager(config, controllerContext,
+      replicaStateMachine, partitionStateMachine, deletionClient)
 
     partitionStateMachine.handleStateChanges(partitions, NewPartition)
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
-    assertEquals(s"There should be ${partitions.size} offline partition(s)", partitions.size, mockController.partitionStateMachine.offlinePartitionCount)
+    partitions.foreach { partition =>
+      val replica = PartitionAndReplica(partition, brokerId)
+      controllerContext.putReplicaState(replica, OfflineReplica)
+    }
 
+    assertEquals(s"There should be ${partitions.size} offline partition(s)", partitions.size, controllerContext.offlinePartitionCount)
     topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
-    assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount)
+    assertEquals(s"There should be no offline partition(s)", 0, controllerContext.offlinePartitionCount)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index ef274fa..cfadfbe 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -30,14 +30,10 @@ import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection.mutable
-
 class ReplicaStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
   private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
-  private var mockTopicDeletionManager: TopicDeletionManager = null
-  private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null
   private var replicaStateMachine: ReplicaStateMachine = null
 
   private val brokerId = 5
@@ -54,10 +50,12 @@ class ReplicaStateMachineTest extends JUnitSuite {
     controllerContext.epoch = controllerEpoch
     mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
-    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
-    replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
-    replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkClient,
-      replicaState, mockControllerBrokerRequestBatch)
+    replicaStateMachine = new ZkReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None),
+      controllerContext, mockZkClient, mockControllerBrokerRequestBatch)
+  }
+
+  private def replicaState(replica: PartitionAndReplica): ReplicaState = {
+    controllerContext.replicaState(replica)
   }
 
   @Test
@@ -103,7 +101,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testNewReplicaToOnlineReplicaTransition(): Unit = {
-    replicaState.put(replica, NewReplica)
+    controllerContext.putReplicaState(replica, NewReplica)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
     assertEquals(OnlineReplica, replicaState(replica))
@@ -111,10 +109,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testNewReplicaToOfflineReplicaTransition(): Unit = {
-    replicaState.put(replica, NewReplica)
+    controllerContext.putReplicaState(replica, NewReplica)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
-      EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)), EasyMock.eq(partition), EasyMock.eq(false)))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
@@ -149,7 +146,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testOnlineReplicaToOnlineReplicaTransition(): Unit = {
-    replicaState.put(replica, OnlineReplica)
+    controllerContext.putReplicaState(replica, OnlineReplica)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -167,7 +164,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   def testOnlineReplicaToOfflineReplicaTransition(): Unit = {
     val otherBrokerId = brokerId + 1
     val replicaIds = List(brokerId, otherBrokerId)
-    replicaState.put(replica, OnlineReplica)
+    controllerContext.putReplicaState(replica, OnlineReplica)
     controllerContext.updatePartitionReplicaAssignment(partition, replicaIds)
     val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -175,8 +172,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
-      EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)), EasyMock.eq(partition), EasyMock.eq(false)))
     val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
     val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
     val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
@@ -185,14 +181,13 @@ class ReplicaStateMachineTest extends JUnitSuite {
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
-    EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false)
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
 
-    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
-    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition))
     assertEquals(OfflineReplica, replicaState(replica))
   }
@@ -224,7 +219,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testOfflineReplicaToOnlineReplicaTransition(): Unit = {
-    replicaState.put(replica, OfflineReplica)
+    controllerContext.putReplicaState(replica, OfflineReplica)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -240,21 +235,21 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testOfflineReplicaToReplicaDeletionStartedTransition(): Unit = {
-    val callbacks = new Callbacks()
-    replicaState.put(replica, OfflineReplica)
+    controllerContext.putReplicaState(replica, OfflineReplica)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
-      partition, true, callbacks.stopReplicaResponseCallback))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId), partition, true))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
-    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted)
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(ReplicaDeletionStarted, replicaState(replica))
   }
 
   @Test
-  def testInvalidOfflineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
-    testInvalidTransition(OfflineReplica, ReplicaDeletionIneligible)
+  def testOfflineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    controllerContext.putReplicaState(replica, OfflineReplica)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
+    assertEquals(ReplicaDeletionIneligible, replicaState(replica))
   }
 
   @Test
@@ -284,25 +279,25 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testReplicaDeletionStartedToReplicaDeletionIneligibleTransition(): Unit = {
-    replicaState.put(replica, ReplicaDeletionStarted)
+    controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
     replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
     assertEquals(ReplicaDeletionIneligible, replicaState(replica))
   }
 
   @Test
   def testReplicaDeletionStartedToReplicaDeletionSuccessfulTransition(): Unit = {
-    replicaState.put(replica, ReplicaDeletionStarted)
+    controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
     replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionSuccessful)
     assertEquals(ReplicaDeletionSuccessful, replicaState(replica))
   }
 
   @Test
   def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = {
-    replicaState.put(replica, ReplicaDeletionSuccessful)
+    controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     replicaStateMachine.handleStateChanges(replicas, NonExistentReplica)
     assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition))
-    assertEquals(None, replicaState.get(replica))
+    assertEquals(None, controllerContext.replicaStates.get(replica))
   }
 
   @Test
@@ -342,7 +337,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = {
-    replicaState.put(replica, ReplicaDeletionIneligible)
+    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -367,7 +362,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   }
 
   private def testInvalidTransition(fromState: ReplicaState, toState: ReplicaState): Unit = {
-    replicaState.put(replica, fromState)
+    controllerContext.putReplicaState(replica, fromState)
     replicaStateMachine.handleStateChanges(replicas, toState)
     assertEquals(fromState, replicaState(replica))
   }
diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
new file mode 100644
index 0000000..e6297c0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import kafka.cluster.{Broker, EndPoint}
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito._
+
+class TopicDeletionManagerTest {
+
+  private val brokerId = 1
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
+  private val deletionClient = mock(classOf[DeletionClient])
+
+  @Test
+  def testBasicDeletion(): Unit = {
+    val controllerContext = initContext(
+      brokers = Seq(1, 2, 3),
+      topics = Set("foo", "bar"),
+      numPartitions = 2,
+      replicationFactor = 3)
+    val replicaStateMachine = new MockReplicaStateMachine(controllerContext)
+    replicaStateMachine.startup()
+
+    val partitionStateMachine = new MockPartitionStateMachine(controllerContext, uncleanLeaderElectionEnabled = false)
+    partitionStateMachine.startup()
+
+    val deletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+      partitionStateMachine, deletionClient)
+    assertTrue(deletionManager.isDeleteTopicEnabled)
+    deletionManager.init(Set.empty, Set.empty)
+
+    val fooPartitions = controllerContext.partitionsForTopic("foo")
+    val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet
+
+    // Queue the topic for deletion
+    deletionManager.enqueueTopicsForDeletion(Set("foo"))
+
+    assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
+    assertEquals(fooReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+    verify(deletionClient).sendMetadataUpdate(fooPartitions)
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+
+    // Complete the deletion
+    deletionManager.completeReplicaDeletion(fooReplicas)
+
+    assertEquals(Set.empty, controllerContext.partitionsForTopic("foo"))
+    assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "foo"))
+    assertEquals(Set(), controllerContext.topicsToBeDeleted)
+    assertEquals(Set(), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+  }
+
+  @Test
+  def testDeletionWithBrokerOffline(): Unit = {
+    val controllerContext = initContext(
+      brokers = Seq(1, 2, 3),
+      topics = Set("foo", "bar"),
+      numPartitions = 2,
+      replicationFactor = 3)
+
+    val replicaStateMachine = new MockReplicaStateMachine(controllerContext)
+    replicaStateMachine.startup()
+
+    val partitionStateMachine = new MockPartitionStateMachine(controllerContext, uncleanLeaderElectionEnabled = false)
+    partitionStateMachine.startup()
+
+    val deletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+      partitionStateMachine, deletionClient)
+    assertTrue(deletionManager.isDeleteTopicEnabled)
+    deletionManager.init(Set.empty, Set.empty)
+
+    val fooPartitions = controllerContext.partitionsForTopic("foo")
+    val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet
+
+    // Broker 2 is taken offline
+    val failedBrokerId = 2
+    val offlineBroker = controllerContext.liveOrShuttingDownBroker(failedBrokerId).get
+    val lastEpoch = controllerContext.liveBrokerIdAndEpochs(failedBrokerId)
+    controllerContext.removeLiveBrokers(Set(failedBrokerId))
+    assertEquals(Set(1, 3), controllerContext.liveBrokerIds)
+
+    val (offlineReplicas, onlineReplicas) = fooReplicas.partition(_.replica == failedBrokerId)
+    replicaStateMachine.handleStateChanges(offlineReplicas.toSeq, OfflineReplica)
+
+    // Start topic deletion
+    deletionManager.enqueueTopicsForDeletion(Set("foo"))
+    assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
+    verify(deletionClient).sendMetadataUpdate(fooPartitions)
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionIneligible))
+
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set("foo"), controllerContext.topicsIneligibleForDeletion)
+
+    // Deletion succeeds for online replicas
+    deletionManager.completeReplicaDeletion(onlineReplicas)
+
+    assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set("foo"), controllerContext.topicsIneligibleForDeletion)
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionSuccessful))
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", OfflineReplica))
+
+    // Broker 2 comes back online and deletion is resumed
+    controllerContext.addLiveBrokersAndEpochs(Map(offlineBroker -> (lastEpoch + 1L)))
+    deletionManager.resumeDeletionForTopics(Set("foo"))
+
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionSuccessful))
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+
+    deletionManager.completeReplicaDeletion(offlineReplicas)
+    assertEquals(Set.empty, controllerContext.partitionsForTopic("foo"))
+    assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "foo"))
+    assertEquals(Set(), controllerContext.topicsToBeDeleted)
+    assertEquals(Set(), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+  }
+
+  @Test
+  def testBrokerFailureAfterDeletionStarted(): Unit = {
+    val controllerContext = initContext(
+      brokers = Seq(1, 2, 3),
+      topics = Set("foo", "bar"),
+      numPartitions = 2,
+      replicationFactor = 3)
+
+    val replicaStateMachine = new MockReplicaStateMachine(controllerContext)
+    replicaStateMachine.startup()
+
+    val partitionStateMachine = new MockPartitionStateMachine(controllerContext, uncleanLeaderElectionEnabled = false)
+    partitionStateMachine.startup()
+
+    val deletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+      partitionStateMachine, deletionClient)
+    deletionManager.init(Set.empty, Set.empty)
+
+    val fooPartitions = controllerContext.partitionsForTopic("foo")
+    val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet
+
+    // Queue the topic for deletion
+    deletionManager.enqueueTopicsForDeletion(Set("foo"))
+    assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition))
+    assertEquals(fooReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+
+    // Broker 2 fails
+    val failedBrokerId = 2
+    val offlineBroker = controllerContext.liveOrShuttingDownBroker(failedBrokerId).get
+    val lastEpoch = controllerContext.liveBrokerIdAndEpochs(failedBrokerId)
+    controllerContext.removeLiveBrokers(Set(failedBrokerId))
+    assertEquals(Set(1, 3), controllerContext.liveBrokerIds)
+    val (offlineReplicas, onlineReplicas) = fooReplicas.partition(_.replica == failedBrokerId)
+
+    // Fail replica deletion
+    deletionManager.failReplicaDeletion(offlineReplicas)
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set("foo"), controllerContext.topicsIneligibleForDeletion)
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionIneligible))
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+
+    // Broker 2 is restarted. The offline replicas remain ineligable
+    // (TODO: this is probably not desired)
+    controllerContext.addLiveBrokersAndEpochs(Map(offlineBroker -> (lastEpoch + 1L)))
+    deletionManager.resumeDeletionForTopics(Set("foo"))
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionIneligible))
+
+    // When deletion completes for the replicas which started, then deletion begins for the remaining ones
+    deletionManager.completeReplicaDeletion(onlineReplicas)
+    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+    assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionSuccessful))
+    assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted))
+
+  }
+
+  def initContext(brokers: Seq[Int],
+                  topics: Set[String],
+                  numPartitions: Int,
+                  replicationFactor: Int): ControllerContext = {
+    val context = new ControllerContext
+    val brokerEpochs = brokers.map { brokerId =>
+      val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("blah"),
+        SecurityProtocol.PLAINTEXT)
+      Broker(brokerId, Seq(endpoint), rack = None) -> 1L
+    }.toMap
+    context.setLiveBrokerAndEpochs(brokerEpochs)
+
+    // Simple round-robin replica assignment
+    var leaderIndex = 0
+    for (topic <- topics; partitionId <- 0 until numPartitions) {
+      val partition = new TopicPartition(topic, partitionId)
+      val replicas = (0 until replicationFactor).map { i =>
+        val replica = brokers((i + leaderIndex) % brokers.size)
+        replica
+      }
+      context.updatePartitionReplicaAssignment(partition, replicas)
+      leaderIndex += 1
+    }
+    context
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 3eff38f..f8c56cb 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -193,7 +193,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
     // The controller should have marked the replica on the original leader as offline
     val controllerServer = servers.find(_.kafkaController.isActive).get
-    val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+    val offlineReplicas = controllerServer.kafkaController.controllerContext.replicasInState(topic, OfflineReplica)
     assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId)))
   }
 


Mime
View raw message