kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9089; Reassignment should be resilient to unexpected errors (#7562)
Date Fri, 25 Oct 2019 02:52:00 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new bdf2446  KAFKA-9089; Reassignment should be resilient to unexpected errors (#7562)
bdf2446 is described below

commit bdf2446ccce592f3c000290f11de88520327aa19
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Oct 24 19:33:16 2019 -0700

    KAFKA-9089; Reassignment should be resilient to unexpected errors (#7562)
    
    The purpose of this patch is to make the reassignment algorithm simpler and more resilient to unexpected errors. Specifically, it has the following improvements:
    
    1. Remove `ReassignedPartitionContext`. We no longer need to track the previous reassignment through the context and we now use the assignment state as the single source of truth for the target replicas in a reassignment.
    2. Remove the intermediate assignment state when overriding a previous reassignment. Instead, an overriding reassignment directly updates the assignment state and shuts down any unneeded replicas. Reassignments are _always_ persisted in Zookeeper before being updated in the controller context.
    3. To fix race conditions with concurrent submissions, reassignment completion for a partition always checks for a zk partition reassignment to be removed. This means the controller no longer needs to track the source of the reassignment.
    4. Api reassignments explicitly remove reassignment state from zk prior to beginning the new reassignment. This fixes an inconsistency in precedence. Upon controller failover, zookeeper reassignments always take precedence over any active reassignment. So if we do not have the logic to remove the zk reassignment when an api reassignment is triggered, then we can revert to the older zk reassignment.
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jun Rao <junrao@gmail.com>
---
 .../kafka/admin/ReassignPartitionsCommand.scala    |   2 +-
 .../controller/ControllerChannelManager.scala      |   2 +-
 .../scala/kafka/controller/ControllerContext.scala |  66 +-
 .../src/main/scala/kafka/controller/Election.scala |   8 +-
 .../scala/kafka/controller/KafkaController.scala   | 714 +++++++++------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   4 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  11 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  22 +-
 core/src/main/scala/kafka/zk/ZkData.scala          |   8 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  12 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   4 +-
 .../admin/ReassignPartitionsClusterTest.scala      |  37 +-
 .../admin/ReassignPartitionsCommandTest.scala      |   4 +-
 .../controller/ControllerChannelManagerTest.scala  |   2 +-
 .../kafka/controller/ControllerContextTest.scala   |  34 +-
 .../controller/ControllerIntegrationTest.scala     |  14 +-
 .../controller/PartitionStateMachineTest.scala     |   2 +-
 .../kafka/controller/ReplicaStateMachineTest.scala |   2 +-
 .../kafka/security/auth/ZkAuthorizationTest.scala  |   4 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   4 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |   8 +-
 21 files changed, 426 insertions(+), 538 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 83d2e00..934191a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -649,7 +649,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
       }
     } catch {
       case _: NodeExistsException =>
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
           "progress for %s. Aborting operation".format(partitionsBeingReassigned))
     }
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c60130b..375f0d3 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
                                        topicPartition: TopicPartition,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicaAssignment: PartitionReplicaAssignment,
+                                       replicaAssignment: ReplicaAssignment,
                                        isNew: Boolean): Unit = {
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index be7f54a..93b0b4d 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -22,43 +22,40 @@ import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Map, Seq, Set, mutable}
 
-object PartitionReplicaAssignment {
-  def fromOldAndNewReplicas(oldReplicas: Seq[Int], newReplicas: Seq[Int]): PartitionReplicaAssignment = {
+object ReplicaAssignment {
+  def fromOldAndNewReplicas(oldReplicas: Seq[Int], newReplicas: Seq[Int]): ReplicaAssignment = {
     val fullReplicaSet = (newReplicas ++ oldReplicas).distinct
-    PartitionReplicaAssignment(
+    ReplicaAssignment(
       fullReplicaSet,
-      fullReplicaSet.filterNot(oldReplicas.contains(_)),
-      fullReplicaSet.filterNot(newReplicas.contains(_))
+      fullReplicaSet.diff(oldReplicas),
+      fullReplicaSet.diff(newReplicas)
     )
   }
+
+  def apply(replicas: Seq[Int]): ReplicaAssignment = {
+    apply(replicas, Seq.empty, Seq.empty)
+  }
 }
 
-case class PartitionReplicaAssignment(replicas: Seq[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]) {
+case class ReplicaAssignment(replicas: Seq[Int],
+                             addingReplicas: Seq[Int],
+                             removingReplicas: Seq[Int]) {
+
+  lazy val originReplicas: Seq[Int] = replicas.diff(addingReplicas)
+  lazy val targetReplicas: Seq[Int] = replicas.diff(removingReplicas)
+
   def isBeingReassigned: Boolean = {
     addingReplicas.nonEmpty || removingReplicas.nonEmpty
   }
 
-  /**
-    * Returns the partition replica assignment previous to this one.
-    * It is different than this one only when the partition is undergoing reassignment
-    * Note that this will not preserve the original ordering
-    */
-  def previousAssignment: PartitionReplicaAssignment = {
-    PartitionReplicaAssignment(
-      replicas.filterNot(addingReplicas.contains(_)),
-      Seq(),
-      Seq()
-    )
+  def reassignTo(newReplicas: Seq[Int]): ReplicaAssignment = {
+    ReplicaAssignment.fromOldAndNewReplicas(originReplicas, newReplicas)
   }
 
-  /**
-    * Returns the target replica assignment for this partition.
-    * This is different than the `replicas` variable only when there is a reassignment going on
-    */
-  def targetReplicas: Seq[Int] = replicas.filterNot(removingReplicas.contains(_))
-
-  override def toString: String = s"PartitionReplicaAssignment(replicas: ${replicas.mkString(",")}, " +
-    s"addingReplicas: ${addingReplicas.mkString(",")}, removingReplicas: ${removingReplicas.mkString(",")})"
+  override def toString: String = s"ReplicaAssignment(" +
+    s"replicas=${replicas.mkString(",")}, " +
+    s"addingReplicas=${addingReplicas.mkString(",")}, " +
+    s"removingReplicas=${removingReplicas.mkString(",")})"
 }
 
 class ControllerContext {
@@ -71,9 +68,9 @@ class ControllerContext {
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
 
   var allTopics: Set[String] = Set.empty
-  val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, PartitionReplicaAssignment]]
+  val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]
   val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
-  val partitionsBeingReassigned = mutable.Map.empty[TopicPartition, ReassignedPartitionsContext]
+  val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
   val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
   val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
   val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
@@ -121,11 +118,10 @@ class ControllerContext {
       }
   }
 
-  def partitionFullReplicaAssignment(topicPartition: TopicPartition): PartitionReplicaAssignment = {
-    partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
-      .get(topicPartition.partition) match {
+  def partitionFullReplicaAssignment(topicPartition: TopicPartition): ReplicaAssignment = {
+    partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty).get(topicPartition.partition) match {
       case Some(partitionAssignment) => partitionAssignment
-      case None => PartitionReplicaAssignment(Seq(), Seq(), Seq())
+      case None => ReplicaAssignment(Seq(), Seq(), Seq())
     }
   }
 
@@ -133,13 +129,13 @@ class ControllerContext {
     val assignments = partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
     val newAssignment = assignments.get(topicPartition.partition) match {
       case Some(partitionAssignment) =>
-        PartitionReplicaAssignment(
+        ReplicaAssignment(
           newReplicas,
           partitionAssignment.addingReplicas,
           partitionAssignment.removingReplicas
         )
       case None =>
-        PartitionReplicaAssignment(
+        ReplicaAssignment(
           newReplicas,
           Seq.empty,
           Seq.empty
@@ -148,7 +144,7 @@ class ControllerContext {
     updatePartitionFullReplicaAssignment(topicPartition, newAssignment)
   }
 
-  def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition, newAssignment: PartitionReplicaAssignment): Unit = {
+  def updatePartitionFullReplicaAssignment(topicPartition: TopicPartition, newAssignment: ReplicaAssignment): Unit = {
     val assignments = partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
     assignments.put(topicPartition.partition, newAssignment)
   }
@@ -159,7 +155,7 @@ class ControllerContext {
     }.toMap
   }
 
-  def partitionFullReplicaAssignmentForTopic(topic : String): Map[TopicPartition, PartitionReplicaAssignment] = {
+  def partitionFullReplicaAssignmentForTopic(topic : String): Map[TopicPartition, ReplicaAssignment] = {
     partitionAssignments.getOrElse(topic, Map.empty).map {
       case (partition, assignment) => (new TopicPartition(topic, partition), assignment)
     }.toMap
diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
index 163f916..dffa888 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -72,12 +72,12 @@ object Election {
   private def leaderForReassign(partition: TopicPartition,
                                 leaderAndIsr: LeaderAndIsr,
                                 controllerContext: ControllerContext): ElectionResult = {
-    val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
-    val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+    val targetReplicas = controllerContext.partitionFullReplicaAssignment(partition).targetReplicas
+    val liveReplicas = targetReplicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
     val isr = leaderAndIsr.isr
-    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(targetReplicas, isr, liveReplicas.toSet)
     val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
-    ElectionResult(partition, newLeaderAndIsrOpt, reassignment)
+    ElectionResult(partition, newLeaderAndIsrOpt, targetReplicas)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d45fea8..5456c62 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -55,7 +55,7 @@ object KafkaController extends Logging {
   val InitialControllerEpochZkVersion = 0
 
   type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit
-  type ListReassignmentsCallback = Either[Map[TopicPartition, PartitionReplicaAssignment], ApiError] => Unit
+  type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit
   type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit
 }
 
@@ -306,7 +306,8 @@ class KafkaController(val config: KafkaConfig,
     partitionStateMachine.startup()
 
     info(s"Ready to serve as the new controller with epoch $epoch")
-    maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
+
+    initializePartitionReassignments()
     topicDeletionManager.tryTopicDeletion()
     val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
     onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
@@ -423,10 +424,9 @@ class KafkaController(val config: KafkaConfig,
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
-    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
-      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
+    maybeResumeReassignments { (_, assignment) =>
+      assignment.targetReplicas.exists(newBrokersSet.contains)
     }
-    partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
     // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
     // on the newly restarted brokers, there is a chance that topic deletion can resume
     val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
@@ -439,6 +439,14 @@ class KafkaController(val config: KafkaConfig,
     registerBrokerModificationsHandler(newBrokers)
   }
 
+  private def maybeResumeReassignments(shouldResume: (TopicPartition, ReplicaAssignment) => Boolean): Unit = {
+    controllerContext.partitionsBeingReassigned.foreach { tp =>
+      val currentAssignment = controllerContext.partitionFullReplicaAssignment(tp)
+      if (shouldResume(tp, currentAssignment))
+        onPartitionReassignment(tp, currentAssignment)
+    }
+  }
+
   private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
     debug(s"Register BrokerModifications handler for $brokerIds")
     brokerIds.foreach { brokerId =>
@@ -544,7 +552,6 @@ class KafkaController(val config: KafkaConfig,
    * 4. Whenever a new broker comes up which is part of an ongoing reassignment
    * 5. On controller startup/failover
    *
-   *
    * Reassigning replicas for a partition goes through a few steps listed in the code.
    * RS = current assigned replica set
    * ORS = Original replica set for partition
@@ -553,33 +560,27 @@ class KafkaController(val config: KafkaConfig,
    * RR = The replicas we are removing as part of this reassignment
    *
    * A reassignment may have up to three phases, each with its own steps:
+
+   * Phase U (Assignment update): Regardless of the trigger, the first step is in the reassignment process
+   * is to update the existing assignment state. We always update the state in Zookeeper before
+   * we update memory so that it can be resumed upon controller fail-over.
+   *
+   *   U1. Update ZK with RS = ORS + TRS, AR = TRS - ORS, RR = ORS - TRS.
+   *   U2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
+   *   U3. If we are cancelling or replacing an existing reassignment, send StopReplica to all members
+   *       of AR in the original reassignment if they are not in TRS from the new assignment
    *
+   * To complete the reassignment, we need to bring the new replicas into sync, so depending on the state
+   * of the ISR, we will execute one of the following steps.
    *
-   * Cleanup Phase: In the cases where this reassignment has to override an ongoing reassignment.
-   *   . The ongoing reassignment is in phase A
-   *   . ORS denotes the original replica set, prior to the ongoing reassignment
-   *   . URS denotes the unnecessary replicas, ones which are currently part of the AR of the ongoing reassignment but will not be part of the new one
-   *   . OVRS denotes the overlapping replica set - replicas which are part of the AR of the ongoing reassignment and will be part of the overriding reassignment
-   *       (it is essentially (RS - ORS) - URS)
+   * Phase A (when TRS != ISR): The reassignment is not yet complete
    *
-   *   1 Set RS = ORS + OVRS, AR = OVRS, RR = [] in memory
-   *   2 Send LeaderAndIsr request with RS = ORS + OVRS, AR = OVRS, RR = [] to all brokers in ORS + OVRS
-   *     (because the ongoing reassignment is in phase A, we know we wouldn't have a leader in URS
-   *      unless a preferred leader election was triggered while the reassignment was happening)
-   *   3 Replicas in URS -> Offline (force those replicas out of ISR)
-   *   4 Replicas in URS -> NonExistentReplica (force those replicas to be deleted)
+   *   A1. Bump the leader epoch for the partition and send LeaderAndIsr updates to RS.
+   *   A2. Start new replicas AR by moving replicas in AR to NewReplica state.
    *
-   * Phase A: Initial trigger (when TRS != ISR)
-   *   A1. Update ZK with RS = ORS + TRS,
-   *                      AR = TRS - ORS and
-   *                      RR = ORS - TRS.
-   *   A2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
-   *   A3. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
-   *       We do this by forcing an update of the leader epoch in zookeeper.
-   *   A4. Start new replicas AR by moving replicas in AR to NewReplica state.
+   * Phase B (when TRS = ISR): The reassignment is complete
    *
-   * Phase B: All of TRS have caught up with the leaders and are in ISR
-   *   B1. Move all replicas in TRS to OnlineReplica state.
+   *   B1. Move all replicas in AR to OnlineReplica state.
    *   B2. Set RS = TRS, AR = [], RR = [] in memory.
    *   B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding any replica in TRS - ORS back in the isr.
    *       If the current leader is not in TRS or isn't alive, we move the leader to a new replica in TRS.
@@ -611,51 +612,36 @@ class KafkaController(val config: KafkaConfig,
    * Note that we have to update RS in ZK with TRS last since it's the only place where we store ORS persistently.
    * This way, if the controller crashes before that step, we can still recover.
    */
-  private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext): Unit = {
-    val originalAssignmentOpt = maybeRevertOngoingReassignment(topicPartition, reassignedPartitionContext)
-    val oldReplicas = originalAssignmentOpt match {
-      case Some(originalReplicas) => originalReplicas
-      case None => controllerContext.partitionFullReplicaAssignment(topicPartition).previousAssignment.replicas
-    }
-    // RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
-    val partitionAssignment = PartitionReplicaAssignment.fromOldAndNewReplicas(
-      oldReplicas = oldReplicas,
-      newReplicas = reassignedPartitionContext.newReplicas)
-    assert(reassignedPartitionContext.newReplicas == partitionAssignment.targetReplicas,
-      s"newReplicas ${reassignedPartitionContext.newReplicas} were not equal to the expected targetReplicas ${partitionAssignment.targetReplicas}")
-    val targetReplicas = partitionAssignment.targetReplicas
-
-    if (!areReplicasInIsr(topicPartition, targetReplicas)) {
-      info(s"New replicas ${targetReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
-        "caught up with the leader")
-
-      // A1. Update ZK with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS.
-      updateReplicaAssignmentForPartition(topicPartition, partitionAssignment)
-      // A2. Update memory with RS = ORS + TRS, AR = TRS - ORS and RR = ORS - TRS
-      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, partitionAssignment)
-      // A3. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
-      val updatedAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
-      updateLeaderEpochAndSendRequest(topicPartition, updatedAssignment.replicas, updatedAssignment)
-      // A4. replicas in AR -> NewReplica
-      startNewReplicasForReassignedPartition(topicPartition, updatedAssignment.addingReplicas)
-      info(s"Waiting for new replicas ${updatedAssignment.addingReplicas.mkString(",")} for partition $topicPartition being " +
-        s"reassigned to catch up with the leader (target replicas ${updatedAssignment.targetReplicas})")
+  private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = {
+    // While a reassignment is in progress, deletion is not allowed
+    topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress")
+
+    updateCurrentReassignment(topicPartition, reassignment)
+
+    val addingReplicas = reassignment.addingReplicas
+    val removingReplicas = reassignment.removingReplicas
+
+    if (!isReassignmentComplete(topicPartition, reassignment)) {
+      // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
+      updateLeaderEpochAndSendRequest(topicPartition, reassignment)
+      // A2. replicas in AR -> NewReplica
+      startNewReplicasForReassignedPartition(topicPartition, addingReplicas)
     } else {
-      // B1. replicas in TRS -> OnlineReplica
-      targetReplicas.foreach { replica =>
-        replicaStateMachine.handleStateChanges(Seq(PartitionAndReplica(topicPartition, replica)), OnlineReplica)
-      }
+      // B1. replicas in AR -> OnlineReplica
+      replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica)
       // B2. Set RS = TRS, AR = [], RR = [] in memory.
+      val completedReassignment = ReplicaAssignment(reassignment.targetReplicas)
+      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment)
       // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and
       //   a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS
-      moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext, partitionAssignment)
+      moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment)
       // B4. replicas in RR -> Offline (force those replicas out of isr)
       // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted)
-      stopRemovedReplicasOfReassignedPartition(topicPartition, partitionAssignment.removingReplicas)
+      stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas)
       // B6. Update ZK with RS = TRS, AR = [], RR = [].
-      updateReplicaAssignmentForPartition(topicPartition, controllerContext.partitionFullReplicaAssignment(topicPartition))
+      updateReplicaAssignmentForPartition(topicPartition, completedReassignment)
       // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it.
-      removePartitionFromReassignedPartitions(topicPartition)
+      removePartitionFromReassigningPartitions(topicPartition, completedReassignment)
       // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
@@ -664,117 +650,88 @@ class KafkaController(val config: KafkaConfig,
   }
 
   /**
-    * This is called in case we need to override/revert an ongoing reassignment.
-    * Note that due to the way we compute the original replica set, we have no guarantee that a revert would put it in the same order.
-    * @return An option of the original replicas prior to the ongoing reassignment. None if there is no ongoing reassignment
-    */
-  private def maybeRevertOngoingReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext): Option[Seq[Int]] = {
-    reassignedPartitionContext.ongoingReassignmentOpt match {
-      case Some(ongoingAssignment) =>
-        val originalAssignment = ongoingAssignment.previousAssignment
-        assert(ongoingAssignment.isBeingReassigned)
-        assert(!originalAssignment.isBeingReassigned)
+   * Update the current assignment state in Zookeeper and in memory. If a reassignment is already in
+   * progress, then the new reassignment will supplant it and some replicas will be shutdown.
+   *
+   * Note that due to the way we compute the original replica set, we cannot guarantee that a
+   * cancellation will restore the original replica order. Target replicas are always listed
+   * first in the replica set in the desired order, which means we have no way to get to the
+   * original order if the reassignment overlaps with the current assignment. For example,
+   * with an initial assignment of [1, 2, 3] and a reassignment of [3, 4, 2], then the replicas
+   * will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If the reassignment
+   * is cancelled, there is no way to restore the original order.
+   *
+   * @param topicPartition The reassigning partition
+   * @param reassignment The new reassignment
+   *
+   * @return The updated assignment state
+   */
+  private def updateCurrentReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = {
+    val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
 
-        val unnecessaryReplicas = ongoingAssignment.replicas.filterNot(originalAssignment.replicas.contains(_))
-        val (overlappingReplicas, replicasToRemove) = unnecessaryReplicas.partition(reassignedPartitionContext.newReplicas.contains(_))
-        // RS = ORS + OVRS, AR = OVRS, RR = []
-        val intermediateReplicaAssignment = PartitionReplicaAssignment(originalAssignment.replicas ++ overlappingReplicas, overlappingReplicas, Seq())
+    if (currentAssignment != reassignment) {
+      debug(s"Updating assignment of partition $topicPartition from $currentAssignment to $reassignment")
 
-        if (isDebugEnabled)
-          debug(s"Reverting previous reassignment $originalAssignment (we were in the " +
-            s"process of an ongoing reassignment with target replicas ${ongoingAssignment.targetReplicas.mkString(",")} (" +
-            s"Overlapping replicas: ${overlappingReplicas.mkString(",")}, Replicas to remove: ${replicasToRemove.mkString(",")})")
+      // U1. Update assignment state in zookeeper
+      updateReplicaAssignmentForPartition(topicPartition, reassignment)
+      // U2. Update assignment state in memory
+      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, reassignment)
 
-        // Set RS = ORS + OVRS, AR = OVRS, RR = [] in memory.
-        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, intermediateReplicaAssignment)
+      // If there is a reassignment already in progress, then some of the currently adding replicas
+      // may be eligible for immediate removal, in which case we need to stop the replicas.
+      val unneededReplicas = currentAssignment.replicas.diff(reassignment.replicas)
+      if (unneededReplicas.nonEmpty)
+        stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas)
+    }
 
-        // Increment leader epoch and send LeaderAndIsr with new RS (using old replicas and overlapping replicas) and same ISR to every broker in ORS + OVRS
-        // This will prevent the leader from adding any replica outside RS back in the ISR
-        updateLeaderEpochAndSendRequest(topicPartition, intermediateReplicaAssignment.replicas, intermediateReplicaAssignment)
+    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
+    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
 
-        // replicas in URS -> Offline (force those replicas out of isr)
-        // replicas in URS -> NonExistentReplica (force those replicas to be deleted)
-        stopRemovedReplicasOfReassignedPartition(topicPartition, replicasToRemove)
-        reassignedPartitionContext.ongoingReassignmentOpt = None
-        Some(originalAssignment.replicas)
-      case None => None // nothing to revert
-    }
+    controllerContext.partitionsBeingReassigned.add(topicPartition)
   }
 
   /**
-   * Trigger partition reassignment for the provided partitions if the assigned replicas are not the same as the
-   * reassigned replicas (as defined in `ControllerContext.partitionsBeingReassigned`) and if the topic has not been
-   * deleted.
+   * Trigger a partition reassignment provided that the topic exists and is not being deleted.
    *
-   * Called when:
-   * 1. zNode is first created
-   * 2. Controller fail over
-   * 3. AlterPartitionReassignments API is called
+   * This is called when a reassignment is initially received either through Zookeeper or through the
+   * AlterPartitionReassignments API
    *
-   * `partitionsBeingReassigned` must be populated with all partitions being reassigned before this method is invoked
-   * as explained in the method documentation of `removePartitionFromReassignedPartitions` (which is invoked by this
-   * method).
+   * The `partitionsBeingReassigned` field in the controller context will be updated by this
+   * call after the reassignment completes validation and is successfully stored in the topic
+   * assignment zNode.
    *
-   * @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
+   * @param reassignments The reassignments to begin processing
+   * @return A map of any errors in the reassignment. If the error is NONE for a given partition,
+   *         then the reassignment was submitted successfully.
    */
-  private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]): Map[TopicPartition, ApiError] = {
-    val reassignmentResults: mutable.Map[TopicPartition, ApiError] = mutable.Map.empty
-    val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
+  private def maybeTriggerPartitionReassignment(reassignments: Map[TopicPartition, ReplicaAssignment]): Map[TopicPartition, ApiError] = {
+    reassignments.map { case (tp, reassignment) =>
+      val topic = tp.topic
 
-    topicPartitions.foreach { tp =>
-      if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
+      val apiError = if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
         info(s"Skipping reassignment of $tp since the topic is currently being deleted")
-        partitionsToBeRemovedFromReassignment.add(tp)
-        reassignmentResults.put(tp, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
+        new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")
       } else {
-        val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
-          throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
-            s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")
-        }
-        val newReplicas = reassignedPartitionContext.newReplicas
-        val topic = tp.topic
         val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
         if (assignedReplicas.nonEmpty) {
-          if (assignedReplicas == newReplicas) {
-            info(s"Partition $tp to be reassigned is already assigned to replicas " +
-              s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
-            partitionsToBeRemovedFromReassignment.add(tp)
-            reassignmentResults.put(tp, ApiError.NONE)
-          } else {
-            try {
-              info(s"Handling reassignment of partition $tp from current replicas ${assignedReplicas.mkString(",")}" +
-                s"to new replicas ${newReplicas.mkString(",")}")
-              // first register ISR change listener
-              reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
-              // mark topic ineligible for deletion for the partitions being reassigned
-              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
-                reason = "topic reassignment in progress")
-              onPartitionReassignment(tp, reassignedPartitionContext)
-              reassignmentResults.put(tp, ApiError.NONE)
-            } catch {
-              case e: ControllerMovedException =>
-                error(s"Error completing reassignment of partition $tp because controller has moved to another broker", e)
-                throw e
-              case e: Throwable =>
-                error(s"Error completing reassignment of partition $tp", e)
-                partitionsToBeRemovedFromReassignment.add(tp)
-                zkClient.getFullReplicaAssignmentForTopics(immutable.Set(tp.topic())).find(_._1 == tp) match {
-                  case Some(persistedAssignment) =>
-                    controllerContext.updatePartitionFullReplicaAssignment(tp, persistedAssignment._2)
-                  case None =>
-                }
-                reassignmentResults.put(tp, new ApiError(Errors.UNKNOWN_SERVER_ERROR))
-            }
+          try {
+            onPartitionReassignment(tp, reassignment)
+            ApiError.NONE
+          } catch {
+            case e: ControllerMovedException =>
+              info(s"Failed completing reassignment of partition $tp because controller has moved to another broker")
+              throw e
+            case e: Throwable =>
+              error(s"Error completing reassignment of partition $tp", e)
+              new ApiError(Errors.UNKNOWN_SERVER_ERROR)
           }
         } else {
-            error(s"Ignoring request to reassign partition $tp that doesn't exist.")
-            partitionsToBeRemovedFromReassignment.add(tp)
-            reassignmentResults.put(tp, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
+            new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")
         }
       }
+
+      tp -> apiError
     }
-    removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
-    reassignmentResults
   }
 
   /**
@@ -832,16 +789,13 @@ class KafkaController(val config: KafkaConfig,
     val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
     controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
     info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
-    controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
+    controllerContext.allTopics = zkClient.getAllTopicsInCluster
     registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
     zkClient.getFullReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
       case (topicPartition, replicaAssignment) =>
         controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)
-        if (replicaAssignment.isBeingReassigned) {
-          val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
-          controllerContext.partitionsBeingReassigned.put(topicPartition, ReassignedPartitionsContext(replicaAssignment.targetReplicas,
-            reassignIsrChangeHandler, persistedInZk = false, ongoingReassignmentOpt = None))
-        }
+        if (replicaAssignment.isBeingReassigned)
+          controllerContext.partitionsBeingReassigned.add(topicPartition)
     }
     controllerContext.partitionLeadershipInfo.clear()
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
@@ -851,7 +805,6 @@ class KafkaController(val config: KafkaConfig,
     updateLeaderAndIsrCache()
     // start the channel manager
     controllerChannelManager.startup()
-    initializePartitionReassignment()
     info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
     info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
     info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
@@ -878,28 +831,15 @@ class KafkaController(val config: KafkaConfig,
   }
 
   /**
-    * Initializes the partitions being reassigned by reading them from the /admin/reassign_partitions znode
-    * This will overwrite any reassignments that were set by the AlterPartitionReassignments API
-    */
-  private def initializePartitionReassignment(): Unit = {
-    val partitionsBeingReassigned = zkClient.getPartitionReassignment
-    if (partitionsBeingReassigned.nonEmpty) {
-      info(s"DEPRECATED: Partitions being reassigned through ZooKeeper: $partitionsBeingReassigned")
-
-      partitionsBeingReassigned.foreach {
-        case (tp, newReplicas) =>
-          val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)
-          val assignment = controllerContext.partitionFullReplicaAssignment(tp)
-          val ongoingReassignmentOption = if (assignment.isBeingReassigned)
-            Some(assignment)
-          else
-            None
-
-          controllerContext.partitionsBeingReassigned += (
-            tp -> ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler,
-              persistedInZk = true,
-              ongoingReassignmentOpt = ongoingReassignmentOption))
-      }
+   * Initialize pending reassignments. This includes reassignments sent through /admin/reassign_partitions,
+   * which will supplant any API reassignments already in progress.
+   */
+  private def initializePartitionReassignments(): Unit = {
+    // New reassignments may have been submitted through Zookeeper while the controller was failing over
+    val zkPartitionsResumed = processZkPartitionReassignment()
+    // We may also have some API-based reassignments that need to be restarted
+    maybeResumeReassignments { (tp, _) =>
+      !zkPartitionsResumed.contains(tp)
     }
   }
 
@@ -909,7 +849,7 @@ class KafkaController(val config: KafkaConfig,
       val replicasForTopic = controllerContext.replicasForTopic(topic)
       replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
     }}
-    val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
+    val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.map(_.topic)
     val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
     info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
     info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
@@ -923,43 +863,37 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  private def areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean = {
-    zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
-      replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
+  private def isReassignmentComplete(partition: TopicPartition, assignment: ReplicaAssignment): Boolean = {
+    if (!assignment.isBeingReassigned) {
+      true
+    } else {
+      zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+        val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr.toSet
+        val targetReplicas = assignment.targetReplicas.toSet
+        targetReplicas.subsetOf(isr)
+      }
     }
   }
 
   private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
-                                                      reassignedPartitionContext: ReassignedPartitionsContext,
-                                                      currentAssignment: PartitionReplicaAssignment): Unit = {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
+                                                      newAssignment: ReplicaAssignment): Unit = {
+    val reassignedReplicas = newAssignment.replicas
     val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
 
-    // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
-    // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
-    val newAssignment = PartitionReplicaAssignment(replicas = reassignedReplicas, addingReplicas = Seq(), removingReplicas = Seq())
-    controllerContext.updatePartitionFullReplicaAssignment(
-      topicPartition,
-      newAssignment
-    )
-
-    if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
+    if (!reassignedReplicas.contains(currentLeader)) {
       info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
         s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
       // move the leader to one of the alive and caught up new replicas
       partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
+    } else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
+      info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+        s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive")
+      // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
+      updateLeaderEpochAndSendRequest(topicPartition, newAssignment)
     } else {
-      // check if the leader is alive or not
-      if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
-        info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
-          s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive")
-        // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
-        updateLeaderEpochAndSendRequest(topicPartition, newAssignment.replicas, newAssignment)
-      } else {
-        info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
-          s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
-        partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
-      }
+      info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
+        s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
+      partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
     }
   }
 
@@ -975,17 +909,17 @@ class KafkaController(val config: KafkaConfig,
     replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
   }
 
-  private def updateReplicaAssignmentForPartition(partition: TopicPartition,
-                                                  assignment: PartitionReplicaAssignment): Unit = {
-    var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(partition.topic)
-    topicAssignment += partition -> assignment
+  private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = {
+    var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic)
+    topicAssignment += topicPartition -> assignment
 
-    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, topicAssignment, controllerContext.epochZkVersion)
+    val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion)
     setDataResponse.resultCode match {
       case Code.OK =>
-        info(s"Updated assigned replicas for partition $partition being reassigned to ${assignment.targetReplicas.mkString(",")}" +
-          s" (addingReplicas: ${assignment.addingReplicas.mkString(",")}, removingReplicas: ${assignment.removingReplicas.mkString(",")})")
-      case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
+        info(s"Successfully updated assignment of partition $topicPartition to $assignment")
+      case Code.NONODE =>
+        throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " +
+          "has no current assignment")
       case _ => throw new KafkaException(setDataResponse.resultException.get)
     }
   }
@@ -998,28 +932,27 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  private def updateLeaderEpochAndSendRequest(partition: TopicPartition, replicasToReceiveRequest: Seq[Int],
-                                              newAssignedReplicas: PartitionReplicaAssignment): Unit = {
+  private def updateLeaderEpochAndSendRequest(topicPartition: TopicPartition,
+                                              assignment: ReplicaAssignment): Unit = {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-    val replicaSetStr = s"replica set ${newAssignedReplicas.replicas.mkString(",")} " +
-      s"(addingReplicas: ${newAssignedReplicas.addingReplicas.mkString(",")}, removingReplicas: ${newAssignedReplicas.removingReplicas.mkString(",")})"
-    updateLeaderEpoch(partition) match {
+    updateLeaderEpoch(topicPartition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
           brokerRequestBatch.newBatch()
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition,
-            updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition,
+            updatedLeaderIsrAndControllerEpoch, assignment, isNew = false)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
           case e: IllegalStateException =>
             handleIllegalState(e)
         }
-        stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned $replicaSetStr" +
-          s"to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
-          s"for partition being reassigned $partition")
+        stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " +
+          s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
+          s"for partition being reassigned $topicPartition")
+
       case None => // fail the reassignment
-        stateChangeLog.error(s"Failed to send LeaderAndIsr request with new assigned $replicaSetStr " +
-          s"to leader for partition being reassigned $partition")
+        stateChangeLog.error(s"Failed to send LeaderAndIsr request with new replica assignment " +
+          s"$assignment to leader for partition being reassigned $topicPartition")
     }
   }
 
@@ -1038,62 +971,49 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
-    controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
-  }
-
-  /**
-   * Remove partition from partitions being reassigned in ZooKeeper and ControllerContext. If the partition reassignment
-   * is complete (i.e. there is no other partition with a reassignment in progress), the reassign_partitions znode
-   * is deleted.
-   *
-   * `ControllerContext.partitionsBeingReassigned` must be populated with all the zk-persisted partition reassignments before this
-   * method is invoked to avoid premature deletion of the `reassign_partitions` znode.
-   */
-  private def removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]): Unit = {
-    partitionsToBeRemoved.map(controllerContext.partitionsBeingReassigned).foreach { reassignContext =>
-      reassignContext.unregisterReassignIsrChangeHandler(zkClient)
+    controllerContext.partitionsBeingReassigned.foreach { tp =>
+      val path = TopicPartitionStateZNode.path(tp)
+      zkClient.unregisterZNodeChangeHandler(path)
     }
-
-    removePartitionsFromZkReassignment(partitionsToBeRemoved)
-
-    controllerContext.partitionsBeingReassigned --= partitionsToBeRemoved
   }
 
-  private def removePartitionFromReassignedPartitions(partitionToBeRemoved: TopicPartition) {
-    controllerContext.partitionsBeingReassigned.get(partitionToBeRemoved) match {
-      case Some(reassignContext) =>
-        reassignContext.unregisterReassignIsrChangeHandler(zkClient)
-
-        if (reassignContext.persistedInZk) {
-          removePartitionsFromZkReassignment(Set(partitionToBeRemoved))
-        }
-
-        controllerContext.partitionsBeingReassigned.remove(partitionToBeRemoved)
-      case None =>
-        throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory")
+  private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition,
+                                                       assignment: ReplicaAssignment): Unit = {
+    if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+      val path = TopicPartitionStateZNode.path(topicPartition)
+      zkClient.unregisterZNodeChangeHandler(path)
+      maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas)
+      controllerContext.partitionsBeingReassigned.remove(topicPartition)
+    } else {
+      throw new IllegalStateException("Cannot remove a reassigning partition because it is not present in memory")
     }
   }
 
-  private def removePartitionsFromZkReassignment(partitionsToBeRemoved: Set[TopicPartition]): Unit = {
-    if (!zkClient.reassignPartitionsInProgress()) {
-      debug(s"Cannot remove partitions $partitionsToBeRemoved from ZooKeeper because there is no ZooKeeper reassignment present")
+  /**
+   * Remove partitions from an active zk-based reassignment (if one exists).
+   *
+   * @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed
+   */
+  private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = {
+    if (!zkClient.reassignPartitionsInProgress())
       return
-    }
-
-    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned.filter(_._2.persistedInZk).toMap -- partitionsToBeRemoved
 
-    info(s"Removing partitions $partitionsToBeRemoved from the list of reassigned partitions in zookeeper")
+    val reassigningPartitions = zkClient.getPartitionReassignment()
+    val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) =>
+      shouldRemoveReassignment(tp, replicas)
+    }
+    info(s"Removing partitions $removingPartitions from the list of reassigned partitions in zookeeper")
 
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
       info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
       zkClient.deletePartitionReassignment(controllerContext.epochZkVersion)
       // Ensure we detect future reassignments
-      eventManager.put(PartitionReassignment(None, None))
+      eventManager.put(ZkPartitionReassignment)
     } else {
-      val reassignment = updatedPartitionsBeingReassigned.map { case (k, v) => k -> v.newReplicas }
-      try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion)
-      catch {
+      try {
+        zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion)
+      } catch {
         case e: KeeperException => throw new AdminOperationException(e)
       }
     }
@@ -1377,7 +1297,7 @@ class KafkaController(val config: KafkaConfig,
         0
       } else {
         controllerContext.allPartitions.count { topicPartition =>
-          val replicaAssignment: PartitionReplicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+          val replicaAssignment: ReplicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
           val replicas = replicaAssignment.replicas
           val preferredReplica = replicas.head
 
@@ -1553,7 +1473,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def processTopicChange(): Unit = {
     if (!isActive) return
-    val topics = zkClient.getAllTopicsInCluster.toSet
+    val topics = zkClient.getAllTopicsInCluster
     val newTopics = topics -- controllerContext.allTopics
     val deletedTopics = controllerContext.allTopics -- topics
     controllerContext.allTopics = topics
@@ -1640,7 +1560,7 @@ class KafkaController(val config: KafkaConfig,
         // mark topic ineligible for deletion if other state changes are in progress
         topicsToBeDeleted.foreach { topic =>
           val partitionReassignmentInProgress =
-            controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+            controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
           if (partitionReassignmentInProgress)
             topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
               reason = "topic reassignment in progress")
@@ -1655,156 +1575,112 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def processZkPartitionReassignment(): Set[TopicPartition] = {
+    // We need to register the watcher if the path doesn't exist in order to detect future
+    // reassignments and we get the `path exists` check for free
+    if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
+      val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
+      val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
+
+      zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
+        maybeBuildReassignment(tp, Some(targetReplicas)) match {
+          case Some(context) => partitionsToReassign.put(tp, context)
+          case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
+        }
+      }
+
+      reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign)
+      val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE)
+      if (partitionsFailed.nonEmpty) {
+        warn(s"Failed reassignment through zk with the following errors: $partitionsFailed")
+        maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp))
+      }
+      partitionsReassigned.keySet
+    } else {
+      Set.empty
+    }
+  }
+
   /**
-   * Process a partition reassignment.
-   * A partition reassignment can be triggered through the AlterPartitionReassignment API (in which case reassignmentsOpt is present)
-   *   or through the /admin/reassign_partitions znode (in which case reassignmentsOpt is None).
-   * In both cases, we need to populate `partitionsBeingReassigned` with all partitions being reassigned
-   *   before invoking `maybeTriggerPartitionReassignment` (see method documentation for the reason)
+   * Process a partition reassignment from the AlterPartitionReassignment API. If there is an
+   * existing reassignment through zookeeper for any of the requested partitions, they will be
+   * cancelled prior to beginning the new reassignment. Any zk-based reassignment for partitions
+   * which are NOT included in this call will not be affected.
    *
-   * @param reassignmentsOpt - optional map of reassignments, expected when an API reassignment is issued
-   *                           The map consists of topic partitions to an optional sequence of target replicas.
-   *                           An empty target replicas option denotes a revert of an on-going reassignment.
-   * @param callback - optional callback, expected when an API reassignment is issued
+   * @param reassignments Map of reassignments passed through the AlterReassignments API. A null value
+   *                      means that we should cancel an in-progress reassignment.
+   * @param callback Callback to send AlterReassignments response
    */
-  private def processPartitionReassignment(reassignmentsOpt: Option[Map[TopicPartition, Option[Seq[Int]]]],
-                                           callback: Option[AlterReassignmentsCallback]): Unit = {
+  private def processApiPartitionReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]],
+                                              callback: AlterReassignmentsCallback): Unit = {
     if (!isActive) {
-      callback match {
-        case Some(cb) => cb(Right(new ApiError(Errors.NOT_CONTROLLER)))
-        case None =>
-      }
-      return
-    }
-
-    val reassignmentResults: mutable.Map[TopicPartition, ApiError] = mutable.Map.empty
-    val partitionsToBeReassigned = reassignmentsOpt match {
-      case Some(reassignments) => // API triggered
-        val (savedReassignments, _) = reassignments.partition { case (tp, targetReplicas) =>
-          if (replicasAreValid(targetReplicas)) {
-            savePartitionReassignment(reassignmentResults, tp, targetReplicas, zkTriggered = false)
-          } else {
-            reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "The partition assignment is not valid."))
-            false
-          }
-        }
-        savedReassignments.keySet
-
-      case None => // ZK triggered
-        // We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get
-        // the `path exists` check for free
-        if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
-          val partitionReassignment = zkClient.getPartitionReassignment
-          val (savedReassignments, _) = partitionReassignment.partition { case (tp, targetReplicas) =>
-            savePartitionReassignment(reassignmentResults, tp, Some(targetReplicas), zkTriggered = true)
+      callback(Right(new ApiError(Errors.NOT_CONTROLLER)))
+    } else {
+      val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
+      val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
+
+      reassignments.foreach { case (tp, targetReplicas) =>
+        if (replicasAreValid(tp, targetReplicas)) {
+          maybeBuildReassignment(tp, targetReplicas) match {
+            case Some(context) => partitionsToReassign.put(tp, context)
+            case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
           }
-          savedReassignments.keySet
         } else {
-          Set.empty[TopicPartition]
+          reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT))
         }
-    }
+      }
 
-    reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToBeReassigned)
-    callback match {
-      case Some(cb) => cb(Left(reassignmentResults))
-      case None =>
+      // The latest reassignment (whether by API or through zk) always takes precedence,
+      // so remove from active zk reassignment (if one exists)
+      maybeRemoveFromZkReassignment((tp, _) => partitionsToReassign.contains(tp))
+
+      reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign)
+      callback(Left(reassignmentResults))
     }
   }
 
-  private def replicasAreValid(replicasOpt: Option[Seq[Int]]): Boolean = {
+  private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = {
     replicasOpt match {
       case Some(replicas) =>
         val replicaSet = replicas.toSet
-
         if (replicas.isEmpty || replicas.size != replicaSet.size)
           false
         else if (replicas.exists(_ < 0))
           false
-        else
-          replicaSet.subsetOf(controllerContext.liveBrokerIds)
+        else {
+          // Ensure that any new replicas are among the live brokers
+          val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+          val newAssignment = currentAssignment.reassignTo(replicas)
+          newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
+        }
+
       case None => true
     }
   }
 
-  private def savePartitionReassignment(reassignmentResults: mutable.Map[TopicPartition, ApiError], partition: TopicPartition,
-                                        targetReplicasOpt: Option[Seq[Int]], zkTriggered: Boolean): Boolean = {
-    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, partition)
-    val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
-    val reassignmentIsInProgress = controllerContext.partitionsBeingReassigned.contains(partition)
-
-    val newContextOpt = targetReplicasOpt match {
-      case Some(targetReplicas) =>
-        if (reassignmentIsInProgress) {
-          info(s"Overriding old reassignment for partition $partition " +
-            s"(with target replicas ${replicaAssignment.targetReplicas.mkString(",")}) " +
-            s"to new target replicas (${targetReplicas.mkString(",")})")
-          assert(replicaAssignment.isBeingReassigned)
-
-          val oldContext = controllerContext.partitionsBeingReassigned(partition)
-          oldContext.unregisterReassignIsrChangeHandler(zkClient)
-
-          Some(ReassignedPartitionsContext(targetReplicas, reassignIsrChangeHandler,
-            persistedInZk = zkTriggered || oldContext.persistedInZk,
-            ongoingReassignmentOpt = Some(replicaAssignment))
-          )
-        } else {
-          Some(ReassignedPartitionsContext(targetReplicas, reassignIsrChangeHandler,
-            persistedInZk = zkTriggered,
-            ongoingReassignmentOpt = None)
-          )
-        }
-      case None => // revert
-        if (reassignmentIsInProgress) {
-          val originalAssignment = replicaAssignment.previousAssignment
-          info(s"Reverting old reassignment for partition $partition " +
-            s"(with target replicas ${replicaAssignment.targetReplicas.mkString(",")}) " +
-            s"to original replicas (${originalAssignment.replicas.mkString(",")})")
-          assert(replicaAssignment.isBeingReassigned)
-
-          val oldContext = controllerContext.partitionsBeingReassigned(partition)
-          oldContext.unregisterReassignIsrChangeHandler(zkClient)
-
-          Some(ReassignedPartitionsContext(originalAssignment.replicas, reassignIsrChangeHandler,
-            persistedInZk = oldContext.persistedInZk,
-            ongoingReassignmentOpt = Some(replicaAssignment)
-          ))
-        } else {
-          reassignmentResults.put(partition, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
-          None
-        }
-    }
-
-    newContextOpt match {
-      case Some(newContext) =>
-        controllerContext.partitionsBeingReassigned.put(partition, newContext)
-        true
-      case None => false
+  private def maybeBuildReassignment(topicPartition: TopicPartition,
+                                     targetReplicasOpt: Option[Seq[Int]]): Option[ReplicaAssignment] = {
+    val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+    if (replicaAssignment.isBeingReassigned) {
+      val targetReplicas = targetReplicasOpt.getOrElse(replicaAssignment.originReplicas)
+      Some(replicaAssignment.reassignTo(targetReplicas))
+    } else {
+      targetReplicasOpt.map { targetReplicas =>
+        replicaAssignment.reassignTo(targetReplicas)
+      }
     }
   }
 
-
-  private def processPartitionReassignmentIsrChange(partition: TopicPartition): Unit = {
+  private def processPartitionReassignmentIsrChange(topicPartition: TopicPartition): Unit = {
     if (!isActive) return
-    // check if this partition is still being reassigned or not
-    controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
-      val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
-      zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
-        case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
-          val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-          val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-          if (caughtUpReplicas == reassignedReplicas) {
-            // resume the partition reassignment process
-            info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
-              s"partition $partition being reassigned. Resuming partition reassignment")
-            onPartitionReassignment(partition, reassignedPartitionContext)
-          }
-          else {
-            info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
-              s"partition $partition being reassigned. Replica(s) " +
-              s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
-          }
-        case None => error(s"Error handling reassignment of partition $partition to replicas " +
-          s"${reassignedReplicas.mkString(",")} as it was never created")
+
+    if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+      val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+      if (isReassignmentComplete(topicPartition, reassignment)) {
+        // resume the partition reassignment process
+        info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
+          s"reassigning partition $topicPartition")
+        onPartitionReassignment(topicPartition, reassignment)
       }
     }
   }
@@ -1813,10 +1689,10 @@ class KafkaController(val config: KafkaConfig,
     if (!isActive) {
       callback(Right(new ApiError(Errors.NOT_CONTROLLER)))
     } else {
-      val results: mutable.Map[TopicPartition, PartitionReplicaAssignment] = mutable.Map.empty
+      val results: mutable.Map[TopicPartition, ReplicaAssignment] = mutable.Map.empty
       val partitionsToList = partitionsOpt match {
         case Some(partitions) => partitions
-        case None => controllerContext.partitionsBeingReassigned.keys
+        case None => controllerContext.partitionsBeingReassigned
       }
 
       partitionsToList.foreach { tp =>
@@ -1866,7 +1742,7 @@ class KafkaController(val config: KafkaConfig,
 
   def alterPartitionReassignments(partitions: Map[TopicPartition, Option[Seq[Int]]],
                                   callback: AlterReassignmentsCallback): Unit = {
-    eventManager.put(PartitionReassignment(Some(partitions), Some(callback)))
+    eventManager.put(ApiPartitionReassignment(partitions, callback))
   }
 
   private def preemptReplicaLeaderElection(
@@ -2016,8 +1892,10 @@ class KafkaController(val config: KafkaConfig,
           processPartitionModifications(topic)
         case TopicDeletion =>
           processTopicDeletion()
-        case PartitionReassignment(reassignments, callback) =>
-          processPartitionReassignment(reassignments, callback)
+        case ApiPartitionReassignment(reassignments, callback) =>
+          processApiPartitionReassignment(reassignments, callback)
+        case ZkPartitionReassignment =>
+          processZkPartitionReassignment()
         case ListPartitionReassignments(partitions, callback) =>
           processListPartitionReassignments(partitions, callback)
         case PartitionReassignmentIsrChange(partition) =>
@@ -2099,7 +1977,7 @@ class PartitionReassignmentHandler(eventManager: ControllerEventManager) extends
   // Note that the event is also enqueued when the znode is deleted, but we do it explicitly instead of relying on
   // handleDeletion(). This approach is more robust as it doesn't depend on the watcher being re-registered after
   // it's consumed during data changes (we ensure re-registration when the znode is deleted).
-  override def handleCreation(): Unit = eventManager.put(PartitionReassignment(None, None))
+  override def handleCreation(): Unit = eventManager.put(ZkPartitionReassignment)
 }
 
 class PartitionReassignmentIsrChangeHandler(eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
@@ -2132,26 +2010,6 @@ class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNod
   override def handleDataChange(): Unit = eventManager.put(ControllerChange)
 }
 
-/**
-  * @param newReplicas - the target replicas for this partition
-  * @param reassignIsrChangeHandler - a handler for tracking ISR changes in this partition
-  * @param persistedInZk - a boolean indicating whether this partition is stored in the /admin/reassign_partitions znode
-  *                        it is needed to ensure that an API reassignment that overrides a znode reassignment still cleans up after itself
-  * @param ongoingReassignmentOpt - the ongoing reassignment for this partition, if one is present -- it will be reverted.
-  */
-case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler,
-                                       persistedInZk: Boolean,
-                                       var ongoingReassignmentOpt: Option[PartitionReplicaAssignment]) {
-
-  def registerReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
-    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
-
-  def unregisterReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
-    zkClient.unregisterZNodeChangeHandler(reassignIsrChangeHandler.path)
-
-}
-
 case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
   def topic: String = topicPartition.topic
   def partition: Int = topicPartition.partition
@@ -2261,8 +2119,12 @@ case object TopicDeletion extends ControllerEvent {
   override def state: ControllerState = ControllerState.TopicDeletion
 }
 
-case class PartitionReassignment(reassignments: Option[Map[TopicPartition, Option[Seq[Int]]]],
-                                 callback: Option[AlterReassignmentsCallback]) extends ControllerEvent {
+case object ZkPartitionReassignment extends ControllerEvent {
+  override def state: ControllerState = ControllerState.AlterPartitionReassignment
+}
+
+case class ApiPartitionReassignment(reassignments: Map[TopicPartition, Option[Seq[Int]]],
+                                    callback: AlterReassignmentsCallback) extends ControllerEvent {
   override def state: ControllerState = ControllerState.AlterPartitionReassignment
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9a0ebd7..f194485 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,7 +30,7 @@ import kafka.api.ElectLeadersRequestOps
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
-import kafka.controller.{KafkaController, PartitionReplicaAssignment}
+import kafka.controller.{KafkaController, ReplicaAssignment}
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, LeaveGroupResult, SyncGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.message.ZStdCompressionCodec
@@ -2350,7 +2350,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     authorizeClusterOperation(request, DESCRIBE)
     val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]
 
-    def sendResponseCallback(result: Either[Map[TopicPartition, PartitionReplicaAssignment], ApiError]): Unit = {
+    def sendResponseCallback(result: Either[Map[TopicPartition, ReplicaAssignment], ApiError]): Unit = {
       val responseData = result match {
         case Right(error) => new ListPartitionReassignmentsResponseData().setErrorMessage(error.message()).setErrorCode(error.error().code())
 
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index de6337b..63e2614 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -20,7 +20,7 @@ import java.util.Properties
 
 import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
 import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils._
@@ -93,8 +93,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
 
     // create the partition assignment
-    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(PartitionReplicaAssignment(_, List(), List())).toMap,
-      isUpdate = false)
+    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
   }
 
   /**
@@ -136,7 +135,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     LogConfig.validate(config)
   }
 
-  private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, PartitionReplicaAssignment], isUpdate: Boolean): Unit = {
+  private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], isUpdate: Boolean): Unit = {
     try {
       val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
 
@@ -182,7 +181,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
   * @return the updated replica assignment
   */
   def addPartitions(topic: String,
-                    existingAssignment: Map[Int, PartitionReplicaAssignment],
+                    existingAssignment: Map[Int, ReplicaAssignment],
                     allBrokers: Seq[BrokerMetadata],
                     numPartitions: Int = 1,
                     replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
@@ -211,7 +210,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     }
 
     val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions.map { case (tp, replicas) =>
-      tp -> PartitionReplicaAssignment(replicas, List(), List())
+      tp -> ReplicaAssignment(replicas, List(), List())
     }
     if (!validateOnly) {
       info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 864dcab..4b16860 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, PartitionReplicaAssignment}
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReplicaAssignment}
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
@@ -484,7 +484,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return SetDataResponse
    */
-  def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, PartitionReplicaAssignment], expectedControllerEpochZkVersion: Int): SetDataResponse = {
+  def setTopicAssignmentRaw(topic: String,
+                            assignment: collection.Map[TopicPartition, ReplicaAssignment],
+                            expectedControllerEpochZkVersion: Int): SetDataResponse = {
     val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion)
     retryRequestUntilConnected(setDataRequest, expectedControllerEpochZkVersion)
   }
@@ -496,7 +498,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @throws KeeperException if there is an error while setting assignment
    */
-  def setTopicAssignment(topic: String, assignment: Map[TopicPartition, PartitionReplicaAssignment], expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = {
+  def setTopicAssignment(topic: String,
+                         assignment: Map[TopicPartition, ReplicaAssignment],
+                         expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = {
     val setDataResponse = setTopicAssignmentRaw(topic, assignment, expectedControllerEpochZkVersion)
     setDataResponse.maybeThrow
   }
@@ -508,7 +512,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @throws KeeperException if there is an error while creating assignment
    */
   def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
-    val persistedAssignments = assignment.mapValues(PartitionReplicaAssignment(_, List(), List())).toMap
+    val persistedAssignments = assignment.mapValues(ReplicaAssignment(_)).toMap
     createRecursive(TopicZNode.path(topic), TopicZNode.encode(persistedAssignments))
   }
 
@@ -584,14 +588,14 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     * @param topics the topics whose partitions we wish to get the assignments for.
     * @return the full replica assignment for each partition from the given topics.
     */
-  def getFullReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, PartitionReplicaAssignment] = {
+  def getFullReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, ReplicaAssignment] = {
     val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
     val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
     getDataResponses.flatMap { getDataResponse =>
       val topic = getDataResponse.ctx.get.asInstanceOf[String]
       getDataResponse.resultCode match {
         case Code.OK => TopicZNode.decode(topic, getDataResponse.data)
-        case Code.NONODE => Map.empty[TopicPartition, PartitionReplicaAssignment]
+        case Code.NONODE => Map.empty[TopicPartition, ReplicaAssignment]
         case _ => throw getDataResponse.resultException.get
       }
     }.toMap
@@ -602,7 +606,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param topics the topics whose partitions we wish to get the assignments for.
    * @return the partition assignment for each partition from the given topics.
    */
-  def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, PartitionReplicaAssignment]] = {
+  def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, ReplicaAssignment]] = {
     val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
     val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
     getDataResponses.flatMap { getDataResponse =>
@@ -611,7 +615,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
         val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
         Map(topic -> partitionMap)
       } else if (getDataResponse.resultCode == Code.NONODE) {
-        Map.empty[String, Map[Int, PartitionReplicaAssignment]]
+        Map.empty[String, Map[Int, ReplicaAssignment]]
       } else {
         throw getDataResponse.resultException.get
       }
@@ -805,7 +809,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @deprecated Use the PartitionReassignment Kafka API instead
    */
   @Deprecated
-  def getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]] = {
+  def getPartitionReassignment(): collection.Map[TopicPartition, Seq[Int]] = {
     val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4a81e83..0d1d525 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch, PartitionReplicaAssignment}
+import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch, ReplicaAssignment}
 import kafka.security.auth.Resource.Separator
 import kafka.security.authorizer.AclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -240,7 +240,7 @@ object TopicsZNode {
 
 object TopicZNode {
   def path(topic: String) = s"${TopicsZNode.path}/$topic"
-  def encode(assignment: collection.Map[TopicPartition, PartitionReplicaAssignment]): Array[Byte] = {
+  def encode(assignment: collection.Map[TopicPartition, ReplicaAssignment]): Array[Byte] = {
     val replicaAssignmentJson = mutable.Map[String, util.List[Int]]()
     val addingReplicasAssignmentJson = mutable.Map[String, util.List[Int]]()
     val removingReplicasAssignmentJson = mutable.Map[String, util.List[Int]]()
@@ -260,7 +260,7 @@ object TopicZNode {
       "removing_replicas" -> removingReplicasAssignmentJson.asJava
     ).asJava)
   }
-  def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, PartitionReplicaAssignment] = {
+  def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, ReplicaAssignment] = {
     def getReplicas(replicasJsonOpt: Option[JsonObject], partition: String): Seq[Int] = {
       replicasJsonOpt match {
         case Some(replicasJson) => replicasJson.get(partition) match {
@@ -278,7 +278,7 @@ object TopicZNode {
       val removingReplicasJsonOpt = assignmentJson.get("removing_replicas").map(_.asJsonObject)
       partitionsJsonOpt.map { partitionsJson =>
         partitionsJson.iterator.map { case (partition, replicas) =>
-          new TopicPartition(topic, partition.toInt) -> PartitionReplicaAssignment(
+          new TopicPartition(topic, partition.toInt) -> ReplicaAssignment(
             replicas.to[Seq[Int]],
             getReplicas(addingReplicasJsonOpt, partition),
             getReplicas(removingReplicasJsonOpt, partition)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 13216e8..115f091 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.admin
 
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
 import kafka.network.SocketServer
 import org.junit.Assert._
 import kafka.utils.TestUtils._
@@ -39,15 +39,15 @@ class AddPartitionsTest extends BaseRequestTest {
   val partitionId = 0
 
   val topic1 = "new-topic1"
-  val topic1Assignment = Map(0 -> PartitionReplicaAssignment(Seq(0,1), List(), List()))
+  val topic1Assignment = Map(0 -> ReplicaAssignment(Seq(0,1), List(), List()))
   val topic2 = "new-topic2"
-  val topic2Assignment = Map(0 -> PartitionReplicaAssignment(Seq(1,2), List(), List()))
+  val topic2Assignment = Map(0 -> ReplicaAssignment(Seq(1,2), List(), List()))
   val topic3 = "new-topic3"
-  val topic3Assignment = Map(0 -> PartitionReplicaAssignment(Seq(2,3,0,1), List(), List()))
+  val topic3Assignment = Map(0 -> ReplicaAssignment(Seq(2,3,0,1), List(), List()))
   val topic4 = "new-topic4"
-  val topic4Assignment = Map(0 -> PartitionReplicaAssignment(Seq(0,3), List(), List()))
+  val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
   val topic5 = "new-topic5"
-  val topic5Assignment = Map(1 -> PartitionReplicaAssignment(Seq(0,1), List(), List()))
+  val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
 
   @Before
   override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 34c8c85..eb61dd2 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -29,7 +29,7 @@ import org.junit.{After, Test}
 
 import kafka.admin.TopicCommand.ZookeeperTopicService
 import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.controller.{OfflineReplica, PartitionAndReplica, PartitionReplicaAssignment, ReplicaDeletionSuccessful}
+import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.scalatest.Assertions.fail
@@ -39,7 +39,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   var servers: Seq[KafkaServer] = Seq()
 
   val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
-  val expectedReplicaFullAssignment = expectedReplicaAssignment.mapValues(PartitionReplicaAssignment(_, List(), List())).toMap
+  val expectedReplicaFullAssignment = expectedReplicaAssignment.mapValues(ReplicaAssignment(_, List(), List())).toMap
 
   @After
   override def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index a84c611..6ee2f53 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -31,7 +31,7 @@ import scala.collection.{Map, Seq}
 import scala.util.Random
 import java.io.File
 
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
 import kafka.log.LogConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.config.ConfigResource
@@ -141,6 +141,25 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def testReassignmentMatchesCurrentAssignment(): Unit = {
+    // Given a single replica on server 100
+    startBrokers(Seq(100))
+    adminClient = createAdminClient(servers)
+    createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers)
+
+    // Execute no-op reassignment
+    val topicJson = executeAssignmentJson(Seq(
+      PartitionAssignmentJson(tp0, replicas = Seq(100), logDirectories = None)
+    ))
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
+    waitForZkReassignmentToComplete()
+
+    // The replica should remain on 100
+    val partitionAssignment = zkClient.getPartitionAssignmentForTopics(Set(topicName))(topicName)(tp0.partition)
+    assertMoveForPartitionOccurred(Seq(100), partitionAssignment)
+  }
+
+  @Test
   def shouldMoveSinglePartitionWithinBroker(): Unit = {
     // Given a single replica on server 100
     startBrokers(Seq(100, 101))
@@ -657,11 +676,11 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     adminClient.close()
 
     zkClient.setTopicAssignment("orders", Map(
-      new TopicPartition("orders", 0) -> PartitionReplicaAssignment(List(0, 1), List(2), List(0)), // should be overwritten
-      new TopicPartition("orders", 1) -> PartitionReplicaAssignment(List(1, 2), List(3), List(1)), // should be overwritten
+      new TopicPartition("orders", 0) -> ReplicaAssignment(List(0, 1), List(2), List(0)), // should be overwritten
+      new TopicPartition("orders", 1) -> ReplicaAssignment(List(1, 2), List(3), List(1)), // should be overwritten
       // should be overwritten (so we know to remove it from ZK) even though we do the exact same move
-      sameMoveTp -> PartitionReplicaAssignment(List(0, 1, 2), List(2), List(0)),
-      new TopicPartition("orders", 3) -> PartitionReplicaAssignment(List(0, 1, 2), List(2), List(0)) // moves
+      sameMoveTp -> ReplicaAssignment(List(0, 1, 2), List(2), List(0)),
+      new TopicPartition("orders", 3) -> ReplicaAssignment(List(0, 1, 2), List(2), List(0)) // moves
     ))
     val move = Map(
       new TopicPartition("orders", 0) -> Seq(2, 1), // moves
@@ -916,9 +935,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     waitForZkReassignmentToComplete()
     // 4. Ensure the API reassignment not part of the znode is still in progress
     val leftoverReassignments = adminClient.listPartitionReassignments(Set(tpA0, tpA1, tpB0).asJava).reassignments().get()
-    assertEquals(1, leftoverReassignments.size())
-    val tpB0LeftoverReassignment = leftoverReassignments.get(tpB0)
-    assertIsReassigning(from = Seq(100), to = Seq(102), tpB0LeftoverReassignment)
+    assertTrue(leftoverReassignments.keySet().asScala.subsetOf(Set(tpA1, tpB0)))
 
     resetBrokersThrottle()
     waitForAllReassignmentsToComplete()
@@ -1116,7 +1133,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
    * Asserts that a topic's reassignments completed and span across the expected replicas
    */
   def assertMoveForTopicOccurred(expectedReplicas: Seq[Int],
-                                 partitionAssignments: Map[Int, PartitionReplicaAssignment]): Unit = {
+                                 partitionAssignments: Map[Int, ReplicaAssignment]): Unit = {
     assertEquals(expectedReplicas, partitionAssignments.values.flatMap(_.replicas).toSeq.distinct.sorted)
     assertTrue(partitionAssignments.values.flatMap(_.addingReplicas).isEmpty)
     assertTrue(partitionAssignments.values.flatMap(_.removingReplicas).isEmpty)
@@ -1126,7 +1143,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
    * Asserts that a partition moved to the exact expected replicas in the specific order
    */
   def assertMoveForPartitionOccurred(expectedReplicas: Seq[Int],
-                                     partitionAssignment: PartitionReplicaAssignment): Unit = {
+                                     partitionAssignment: ReplicaAssignment): Unit = {
     assertEquals(expectedReplicas, partitionAssignment.replicas)
     assertTrue(partitionAssignment.addingReplicas.isEmpty)
     assertTrue(partitionAssignment.removingReplicas.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index ac1bc8d..790e3d8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -527,10 +527,10 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testResumePartitionReassignmentThatWasCompleted(): Unit = {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1))
+    val initialAssignment = Map(0  -> List(0, 2))
     val topic = "test"
     // create the topic
-    adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
+    adminZkClient.createTopicWithAssignment(topic, config = new Properties, initialAssignment)
     // put the partition in the reassigned path as well
     // reassign partition 0
     val newReplicas = Seq(0, 1)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index 88607f2..d286967 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -673,7 +673,7 @@ class ControllerChannelManagerTest {
     KafkaConfig.fromProps(props)
   }
 
-  private def replicaAssignment(replicas: Seq[Int]): PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+  private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment = ReplicaAssignment(replicas, Seq(), Seq())
 
   private def initContext(brokers: Seq[Int],
                           topics: Set[String],
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index eeddad0..a0121e2 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -18,7 +18,7 @@
 package unit.kafka.controller
 
 import kafka.cluster.{Broker, EndPoint}
-import kafka.controller.{ControllerContext, PartitionReplicaAssignment}
+import kafka.controller.{ControllerContext, ReplicaAssignment}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -77,7 +77,7 @@ class ControllerContextTest {
   @Test
   def testUpdatePartitionReplicaAssignmentUpdatesReplicaAssignmentOnlyAndDoesNotOverwrite(): Unit = {
     val expectedReplicas = Seq(4)
-    val expectedFullAssignment = PartitionReplicaAssignment(Seq(3), Seq(1), Seq(2))
+    val expectedFullAssignment = ReplicaAssignment(Seq(3), Seq(1), Seq(2))
     context.updatePartitionFullReplicaAssignment(tp1, expectedFullAssignment)
 
     context.updatePartitionReplicaAssignment(tp1, expectedReplicas) // update only the replicas
@@ -100,7 +100,7 @@ class ControllerContextTest {
     assertEquals(Seq(), fullAssignment.addingReplicas)
     assertEquals(Seq(), fullAssignment.removingReplicas)
 
-    val expectedFullAssignment = PartitionReplicaAssignment(Seq(3), Seq(1), Seq(2))
+    val expectedFullAssignment = ReplicaAssignment(Seq(3), Seq(1), Seq(2))
     context.updatePartitionFullReplicaAssignment(tp1, expectedFullAssignment)
     val updatedFullAssignment = context.partitionFullReplicaAssignment(tp1)
     assertEquals(expectedFullAssignment.replicas, updatedFullAssignment.replicas)
@@ -118,7 +118,7 @@ class ControllerContextTest {
 
   @Test
   def testPartitionFullReplicaAssignmentReturnsEmptyAssignmentIfTopicOrPartitionDoesNotExist(): Unit = {
-    val expectedEmptyAssignment = PartitionReplicaAssignment(Seq.empty, Seq.empty, Seq.empty)
+    val expectedEmptyAssignment = ReplicaAssignment(Seq.empty, Seq.empty, Seq.empty)
 
     val noTopicAssignment = context.partitionFullReplicaAssignment(new TopicPartition("NONEXISTENT", 0))
     assertEquals(expectedEmptyAssignment, noTopicAssignment)
@@ -143,23 +143,23 @@ class ControllerContextTest {
 
   @Test
   def testPartitionReplicaAssignment(): Unit = {
-    val reassigningPartition = PartitionReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(2, 3, 4), List(1, 5, 6))
+    val reassigningPartition = ReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(2, 3, 4), List(1, 5, 6))
     assertTrue(reassigningPartition.isBeingReassigned)
     assertEquals(List(2, 3, 4), reassigningPartition.targetReplicas)
 
-    val reassigningPartition2 = PartitionReplicaAssignment(List(1, 2, 3, 4), List(), List(1, 4))
+    val reassigningPartition2 = ReplicaAssignment(List(1, 2, 3, 4), List(), List(1, 4))
     assertTrue(reassigningPartition2.isBeingReassigned)
     assertEquals(List(2, 3), reassigningPartition2.targetReplicas)
 
-    val reassigningPartition3 = PartitionReplicaAssignment(List(1, 2, 3, 4), List(4), List(2))
+    val reassigningPartition3 = ReplicaAssignment(List(1, 2, 3, 4), List(4), List(2))
     assertTrue(reassigningPartition3.isBeingReassigned)
     assertEquals(List(1, 3, 4), reassigningPartition3.targetReplicas)
 
-    val partition = PartitionReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(), List())
+    val partition = ReplicaAssignment(List(1, 2, 3, 4, 5, 6), List(), List())
     assertFalse(partition.isBeingReassigned)
     assertEquals(List(1, 2, 3, 4, 5, 6), partition.targetReplicas)
 
-    val reassigningPartition4 = PartitionReplicaAssignment.fromOldAndNewReplicas(
+    val reassigningPartition4 = ReplicaAssignment.fromOldAndNewReplicas(
       List(1, 2, 3, 4), List(4, 2, 5, 3)
     )
     assertEquals(List(4, 2, 5, 3, 1), reassigningPartition4.replicas)
@@ -168,7 +168,7 @@ class ControllerContextTest {
     assertEquals(List(1), reassigningPartition4.removingReplicas)
     assertTrue(reassigningPartition4.isBeingReassigned)
 
-    val reassigningPartition5 = PartitionReplicaAssignment.fromOldAndNewReplicas(
+    val reassigningPartition5 = ReplicaAssignment.fromOldAndNewReplicas(
       List(1, 2, 3), List(4, 5, 6)
     )
     assertEquals(List(4, 5, 6, 1, 2, 3), reassigningPartition5.replicas)
@@ -177,7 +177,7 @@ class ControllerContextTest {
     assertEquals(List(1, 2, 3), reassigningPartition5.removingReplicas)
     assertTrue(reassigningPartition5.isBeingReassigned)
 
-    val nonReassigningPartition = PartitionReplicaAssignment.fromOldAndNewReplicas(
+    val nonReassigningPartition = ReplicaAssignment.fromOldAndNewReplicas(
       List(1, 2, 3), List(3, 1, 2)
     )
     assertEquals(List(3, 1, 2), nonReassigningPartition.replicas)
@@ -186,4 +186,16 @@ class ControllerContextTest {
     assertEquals(List(), nonReassigningPartition.removingReplicas)
     assertFalse(nonReassigningPartition.isBeingReassigned)
   }
+
+  @Test
+  def testReassignTo(): Unit = {
+    val assignment = ReplicaAssignment(Seq(1, 2, 3))
+    val firstReassign = assignment.reassignTo(Seq(4, 5, 6))
+
+    assertEquals(ReplicaAssignment(Seq(4, 5, 6, 1, 2, 3), Seq(4, 5, 6), Seq(1, 2, 3)), firstReassign)
+    assertEquals(ReplicaAssignment(Seq(7, 8, 9, 1, 2, 3), Seq(7, 8, 9), Seq(1, 2, 3)), firstReassign.reassignTo(Seq(7, 8, 9)))
+    assertEquals(ReplicaAssignment(Seq(7, 8, 9, 1, 2, 3), Seq(7, 8, 9), Seq(1, 2, 3)), assignment.reassignTo(Seq(7, 8, 9)))
+    assertEquals(assignment, firstReassign.reassignTo(Seq(1,2,3)))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index dc09d73..018a0bb 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -250,8 +250,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp1 = new TopicPartition("t", 1)
     val assignment = Map(tp0.partition -> Seq(0))
     val expandedAssignment = Map(
-      tp0 -> PartitionReplicaAssignment(Seq(0), Seq(), Seq()),
-      tp1 -> PartitionReplicaAssignment(Seq(0), Seq(), Seq()))
+      tp0 -> ReplicaAssignment(Seq(0), Seq(), Seq()),
+      tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq()))
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
     zkClient.setTopicAssignment(tp0.topic, expandedAssignment, firstControllerEpochZkVersion)
     waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
@@ -268,8 +268,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val tp1 = new TopicPartition("t", 1)
     val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
     val expandedAssignment = Map(
-      tp0 -> PartitionReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()),
-      tp1 -> PartitionReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()))
+      tp0 -> ReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()),
+      tp1 -> ReplicaAssignment(Seq(otherBrokerId, controllerId), Seq(), Seq()))
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
@@ -290,7 +290,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
-    val reassignment = Map(tp -> PartitionReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     zkClient.createPartitionReassignment(reassignment.mapValues(_.replicas).toMap)
     waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
@@ -330,7 +330,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
-    val reassignment = Map(tp -> PartitionReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
@@ -628,8 +628,6 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to resign")
 
       // Expect to capture the ControllerMovedException in the log of ControllerEventThread
-      println(appender.getMessages.find(e => e.getLevel == Level.INFO
-        && e.getThrowableInformation != null))
       val event = appender.getMessages.find(e => e.getLevel == Level.INFO
         && e.getThrowableInformation != null
         && e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName))
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 99dc6ed..d00e1bb 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -517,6 +517,6 @@ class PartitionStateMachineTest {
     assertEquals(s"There should be no offline partition(s)", 0, controllerContext.offlinePartitionCount)
   }
 
-  private def replicaAssignment(replicas: Seq[Int]): PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+  private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment = ReplicaAssignment(replicas, Seq(), Seq())
 
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index c43fd09..7a76496 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -409,6 +409,6 @@ class ReplicaStateMachineTest {
     assertEquals(fromState, replicaState(replica))
   }
 
-  private def replicaAssignment(replicas: Seq[Int]): PartitionReplicaAssignment = PartitionReplicaAssignment(replicas, Seq(), Seq())
+  private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment = ReplicaAssignment(replicas, Seq(), Seq())
 
 }
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index f1647ce..6780d98 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -32,7 +32,7 @@ import scala.util.{Failure, Success, Try}
 import javax.security.auth.login.Configuration
 import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
@@ -131,7 +131,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
 
     // Test that can update persistent nodes
     val updatedAssignment = assignment - new TopicPartition(topic1, 2)
-    zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case (v) => PartitionReplicaAssignment(v, List(), List()) }.toMap)
+    zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case (v) => ReplicaAssignment(v, List(), List()) }.toMap)
     assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 07405f7..e81e032 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -19,7 +19,7 @@ package kafka.admin
 import java.util
 import java.util.Properties
 
-import kafka.controller.PartitionReplicaAssignment
+import kafka.controller.ReplicaAssignment
 import kafka.log._
 import kafka.server.DynamicConfig.Broker._
 import kafka.server.KafkaConfig._
@@ -88,7 +88,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
                          1 -> List(1, 2, 3))
     adminZkClient.createTopicWithAssignment("test", topicConfig, assignment)
     val found = zkClient.getPartitionAssignmentForTopics(Set("test"))
-    assertEquals(assignment.mapValues(PartitionReplicaAssignment(_, List(), List())).toMap, found("test"))
+    assertEquals(assignment.mapValues(ReplicaAssignment(_, List(), List())).toMap, found("test"))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 135092d..f11b9eb 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -40,7 +40,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
 import scala.util.Random
-import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionReplicaAssignment}
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
 import org.apache.kafka.common.errors.ControllerMovedException
@@ -169,7 +169,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
-      partition -> PartitionReplicaAssignment(assignment, List(), List())
+      partition -> ReplicaAssignment(assignment, List(), List())
     }
 
     assertEquals(assignment.size, zkClient.getTopicPartitionCount(topic1).get)
@@ -179,7 +179,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     val updatedAssignment = assignment - new TopicPartition(topic1, 2)
 
-    zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case v => PartitionReplicaAssignment(v, List(), List()) }.toMap)
+    zkClient.setTopicAssignment(topic1, updatedAssignment.mapValues { case v => ReplicaAssignment(v, List(), List()) }.toMap)
     assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
 
     // add second topic
@@ -817,7 +817,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createTopicAssignment(topicPartition.topic(),
       Map(topicPartition -> Seq()))
 
-    val expectedAssignment = PartitionReplicaAssignment(Seq(1,2,3), Seq(1), Seq(3))
+    val expectedAssignment = ReplicaAssignment(Seq(1,2,3), Seq(1), Seq(3))
     val response = zkClient.setTopicAssignmentRaw(topicPartition.topic(),
       Map(topicPartition -> expectedAssignment), controllerEpochZkVersion)
     assertEquals(Code.OK, response.resultCode)


Mime
View raw message