kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: PartitionReassignmentHandler should only generate event when znode is created
Date Mon, 06 Nov 2017 21:38:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2b5a21395 -> 6f96d7f17


MINOR: PartitionReassignmentHandler should only generate event when znode is created

We only need to generate the event when the znode is created or deleted.
In the former case, we start the reassignment while in the latter we
re-register the watcher (necessary for the Controller to detect future
reassignments).

During Controller failover, we restart the reassignment without generating
an event so it's not affected by this change.

Also use the Controller cache (`ControllerContext.partitionsBeingReassigned`)
in `removePartitionFromReassignedPartitions` instead of reloading the
data from ZooKeeper.

Overall, we would previously load the reassignment data from ZooKeeper twice
per completed partition whereas now as don't do it at all. As an example,
say there were 30k partitions being reassigned, these changes save the
allocation of 900 million `TopicAndPartition` and `Seq[Int]` (replicas)
instances (could easily amount to 20-40 GB depending on the topic name
length). This matters most in cases where the partitions being reassigned
don't have much data allowing the reassignment to complete reasonably
fast for many of the partitions.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Onur Karaman <okaraman@linkedin.com>

Closes #4143 from ijuma/partition-reassignment-ignore-handle-deletion-and-data-change


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f96d7f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f96d7f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f96d7f1

Branch: refs/heads/trunk
Commit: 6f96d7f1735c53b97ebac43168ded64007277beb
Parents: 2b5a213
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Nov 6 21:33:45 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Nov 6 21:37:55 2017 +0000

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 77 ++++++++--------
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 55 ++++++-----
 core/src/main/scala/kafka/zk/ZkData.scala       |  2 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 96 ++++++++++++++++++++
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 26 +++++-
 5 files changed, 194 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f96d7f1/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d3e6998..b676ead 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse,
StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 
 import scala.collection._
@@ -464,10 +465,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
                                                     reassignedPartitionContext: ReassignedPartitionsContext)
{
-    val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this,
eventManager, partition)
-    reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
+    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager,
partition)
+    reassignedPartitionContext.reassignIsrChangeHandler = reassignIsrChangeHandler
     // register listener on the leader and isr path to wait until they catch up with the
current leader
-    zkClient.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
+    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
   }
 
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -782,11 +783,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers() {
-    controllerContext.partitionsBeingReassigned.foreach {
-      case (topicAndPartition, reassignedPartitionsContext) =>
-        val partitionReassignmentIsrChangeHandler =
-          reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
-        zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+    controllerContext.partitionsBeingReassigned.values.foreach { reassignedPartitionsContext
=>
+      zkClient.unregisterZNodeChangeHandler(reassignedPartitionsContext.reassignIsrChangeHandler.path)
     }
   }
 
@@ -801,31 +799,27 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   }
 
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
-    if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
+    controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignContext
=>
       // stop watching the ISR changes for this partition
-      val partitionReassignmentIsrChangeHandler =
-        controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
-      zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+      zkClient.unregisterZNodeChangeHandler(reassignContext.reassignIsrChangeHandler.path)
     }
-    // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas
=> ReassignedPartitionsContext(replicas))
-    // remove this partition from that list
-    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
+
+    val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned -
topicAndPartition
+
     // write the new list to zookeeper
-    val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
-    if (reassignment.isEmpty) {
+    if (updatedPartitionsBeingReassigned.isEmpty) {
       info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
       zkClient.deletePartitionReassignment()
+      // Ensure we detect future reassignments
+      eventManager.put(PartitionReassignment)
     } else {
-      val setDataResponse = zkClient.setPartitionReassignmentRaw(reassignment)
-      if (setDataResponse.resultCode == Code.NONODE) {
-        val createDataResponse = zkClient.createPartitionReassignment(reassignment)
-        createDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
-      } else {
-        setDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
+      val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
+      try zkClient.setOrCreatePartitionReassignment(reassignment)
+      catch {
+        case e: KeeperException => throw new AdminOperationException(e)
       }
     }
-    // update the cache. NO-OP if the partition's reassignment was never started
+
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
 
@@ -1284,17 +1278,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
     override def process(): Unit = {
       if (!isActive) return
-      zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
-      val partitionReassignment = zkClient.getPartitionReassignment
-      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
-        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic))
{
-          error("Skipping reassignment of partition %s for topic %s since it is currently
being deleted"
-            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
-          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
-        } else {
-          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
-          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+
+      // We need to register the watcher if the path doesn't exist in order to detect future
reassignments and we get
+      // the `path exists` check for free
+      if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))
{
+        val partitionReassignment = zkClient.getPartitionReassignment
+        val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+        partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+          if (topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic))
{
+            error("Skipping reassignment of partition %s for topic %s since it is currently
being deleted"
+              .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+            removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+          } else {
+            val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
+            initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+          }
         }
       }
     }
@@ -1450,9 +1448,10 @@ class TopicDeletionHandler(controller: KafkaController, eventManager:
Controller
 class PartitionReassignmentHandler(controller: KafkaController, eventManager: ControllerEventManager)
extends ZNodeChangeHandler {
   override val path: String = ReassignPartitionsZNode.path
 
+  // Note that the event is also enqueued when the znode is deleted, but we do it explicitly
instead of relying on
+  // handleDeletion(). This approach is more robust as it doesn't depend on the watcher being
re-registered after
+  // it's consumed during data changes (we ensure re-registration when the znode is deleted).
   override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
-  override def handleDeletion(): Unit = eventManager.put(controller.PartitionReassignment)
-  override def handleDataChange(): Unit = eventManager.put(controller.PartitionReassignment)
 }
 
 class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager,
partition: TopicAndPartition) extends ZNodeChangeHandler {
@@ -1492,7 +1491,7 @@ object IsrChangeNotificationListener {
 }
 
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       var partitionReassignmentIsrChangeHandler: PartitionReassignmentIsrChangeHandler
= null)
+                                       var reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler
= null)
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f96d7f1/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 026dc9d..90d53d4 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -431,24 +431,33 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   }
 
   /**
-   * Sets the partition reassignment znode with the given reassignment.
-   * @param reassignment the reassignment to set on the reassignment znode.
-   * @return SetDataResponse
+   * Sets or creates the partition reassignment znode with the given reassignment depending
on whether it already
+   * exists or not.
+   *
+   * @param reassignment the reassignment to set on the reassignment znode
+   * @throws KeeperException if there is an error while setting or creating the znode
    */
-  def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse
= {
-    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
ZkVersion.NoVersion)
-    retryRequestUntilConnected(setDataRequest)
-  }
+  def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicAndPartition, Seq[Int]]):
Unit = {
 
-  /**
-   * Creates the partition reassignment znode with the given reassignment.
-   * @param reassignment the reassignment to set on the reassignment znode.
-   * @return CreateResponse
-   */
-  def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse
= {
-    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
-      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
-    retryRequestUntilConnected(createRequest)
+    def set(reassignmentData: Array[Byte]): SetDataResponse = {
+      val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData,
ZkVersion.NoVersion)
+      retryRequestUntilConnected(setDataRequest)
+    }
+
+    def create(reassignmentData: Array[Byte]): CreateResponse = {
+      val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path),
+        CreateMode.PERSISTENT)
+      retryRequestUntilConnected(createRequest)
+    }
+
+    val reassignmentData = ReassignPartitionsZNode.encode(reassignment)
+    val setDataResponse = set(reassignmentData)
+    setDataResponse.resultCode match {
+      case Code.NONODE =>
+        val createDataResponse = create(reassignmentData)
+        createDataResponse.resultException.foreach(e => throw e)
+      case _ => setDataResponse.resultException.foreach(e => throw e)
+    }
   }
 
   /**
@@ -620,16 +629,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   }
 
   /**
-   * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest,
which allows data watcher
-   * registrations on paths which might not even exist.
+   * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest,
which allows data
+   * watcher registrations on paths which might not even exist.
    *
    * @param zNodeChangeHandler
+   * @return `true` if the path exists or `false` if it does not
+   * @throws KeeperException if an error is returned by ZooKeeper
    */
-  def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler):
Unit = {
+  def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler):
Boolean = {
     zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
     val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
-    if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE)
{
-      throw existsResponse.resultException.get
+    existsResponse.resultCode match {
+      case Code.OK => true
+      case Code.NONODE => false
+      case _ => throw existsResponse.resultException.get
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f96d7f1/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4698455..e46f438 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -204,7 +204,7 @@ object DeleteTopicsTopicZNode {
 
 object ReassignPartitionsZNode {
   def path = s"${AdminZNode.path}/reassign_partitions"
-  def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+  def encode(reassignment: collection.Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
     val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition),
replicas) =>
       Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f96d7f1/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 9727e5c..11f75e2 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -470,6 +470,102 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3))
   }
 
+  /**
+   * Verifies that the Controller sets a watcher for the reassignment znode after reassignment
completion.
+   * This includes the case where the znode is set immediately after it's deleted (i.e. before
the watch is set).
+   * This case relies on the scheduling of the operations, so it won't necessarily fail every
time, but it fails
+   * often enough to detect a regression.
+   */
+  @Test
+  def shouldPerformMultipleReassignmentOperationsOverVariousTopics() {
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))
+
+    createTopic(zkUtils, "orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)), servers)
+    createTopic(zkUtils, "payments", Map(0 -> List(0, 1), 1 -> List(0, 1)), servers)
+    createTopic(zkUtils, "deliveries", Map(0 -> List(0)), servers)
+    createTopic(zkUtils, "customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2),
3 -> List(3)), servers)
+
+    val firstMove = Map(
+      TopicAndPartition("orders", 0) -> Seq(0, 2, 3), //moves
+      TopicAndPartition("orders", 1) -> Seq(0, 1, 2), //stays
+      TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving
+      TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor
+    )
+
+    new ReassignPartitionsCommand(zkUtils, None, firstMove).reassignPartitions()
+    waitForReassignmentToComplete()
+
+    // Check moved replicas did move
+    assertEquals(Seq(0, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
+    assertEquals(Seq(0, 1, 2), zkUtils.getReplicasForPartition("orders", 1))
+    assertEquals(Seq(1, 2), zkUtils.getReplicasForPartition("payments", 1))
+    assertEquals(Seq(1, 2), zkUtils.getReplicasForPartition("deliveries", 0))
+
+    // Check untouched replicas are still there
+    assertEquals(Seq(0, 1), zkUtils.getReplicasForPartition("payments", 0))
+    assertEquals(Seq(0), zkUtils.getReplicasForPartition("customers", 0))
+    assertEquals(Seq(1), zkUtils.getReplicasForPartition("customers", 1))
+    assertEquals(Seq(2), zkUtils.getReplicasForPartition("customers", 2))
+    assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3))
+
+    // Define a move for some of them
+    val secondMove = Map(
+      TopicAndPartition("orders", 0) -> Seq(0, 2, 3), // stays
+      TopicAndPartition("orders", 1) -> Seq(3, 1, 2), // moves
+      TopicAndPartition("payments", 1) -> Seq(2, 1), // changed preferred leader
+      TopicAndPartition("deliveries", 0) -> Seq(1, 2, 3) //increase replication factor
+    )
+
+    new ReassignPartitionsCommand(zkUtils, None, secondMove).reassignPartitions()
+    waitForReassignmentToComplete()
+
+    // Check moved replicas did move
+    assertEquals(Seq(0, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
+    assertEquals(Seq(3, 1, 2), zkUtils.getReplicasForPartition("orders", 1))
+    assertEquals(Seq(2, 1), zkUtils.getReplicasForPartition("payments", 1))
+    assertEquals(Seq(1, 2, 3), zkUtils.getReplicasForPartition("deliveries", 0))
+
+    //Check untouched replicas are still there
+    assertEquals(Seq(0, 1), zkUtils.getReplicasForPartition("payments", 0))
+    assertEquals(Seq(0), zkUtils.getReplicasForPartition("customers", 0))
+    assertEquals(Seq(1), zkUtils.getReplicasForPartition("customers", 1))
+    assertEquals(Seq(2), zkUtils.getReplicasForPartition("customers", 2))
+    assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3))
+
+    // We set the znode and then continuously attempt to set it again to exercise the case
where the znode is set
+    // immediately after deletion (i.e. before we set the watcher again)
+
+    val thirdMove = Map(TopicAndPartition("orders", 0) -> Seq(1, 2, 3))
+
+    new ReassignPartitionsCommand(zkUtils, None, thirdMove).reassignPartitions()
+
+    val fourthMove = Map(TopicAndPartition("payments", 1) -> Seq(2, 3))
+
+    // Continuously attempt to set the reassignment znode with `fourthMove` until it succeeds.
It will only succeed
+    // after `thirdMove` completes.
+    Iterator.continually {
+      try new ReassignPartitionsCommand(zkUtils, None, fourthMove).reassignPartitions()
+      catch {
+        case _: AdminCommandFailedException => false
+      }
+    }.exists(identity)
+
+    waitForReassignmentToComplete()
+
+    // Check moved replicas for thirdMove and fourthMove
+    assertEquals(Seq(1, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
+    assertEquals(Seq(2, 3), zkUtils.getReplicasForPartition("payments", 1))
+
+    //Check untouched replicas are still there
+    assertEquals(Seq(3, 1, 2), zkUtils.getReplicasForPartition("orders", 1))
+    assertEquals(Seq(1, 2, 3), zkUtils.getReplicasForPartition("deliveries", 0))
+    assertEquals(Seq(0, 1), zkUtils.getReplicasForPartition("payments", 0))
+    assertEquals(Seq(0), zkUtils.getReplicasForPartition("customers", 0))
+    assertEquals(Seq(1), zkUtils.getReplicasForPartition("customers", 1))
+    assertEquals(Seq(2), zkUtils.getReplicasForPartition("customers", 2))
+    assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3))
+  }
+
   def waitForReassignmentToComplete() {
     waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath}
wasn't deleted")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f96d7f1/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index f2d95c2..8d064f8 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -157,4 +157,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertFalse(statusAndVersion._1)
     assertEquals(-1, statusAndVersion._2)
   }
-}
\ No newline at end of file
+
+  @Test
+  def testSetGetAndDeletePartitionReassignment() {
+    zkClient.createRecursive(AdminZNode.path)
+
+    assertEquals(Map.empty, zkClient.getPartitionReassignment)
+
+    val reassignment = Map(
+      TopicAndPartition("topic_a", 0) -> Seq(0, 1, 3),
+      TopicAndPartition("topic_a", 1) -> Seq(2, 1, 3),
+      TopicAndPartition("topic_b", 0) -> Seq(4, 5),
+      TopicAndPartition("topic_c", 0) -> Seq(5, 3)
+    )
+    zkClient.setOrCreatePartitionReassignment(reassignment)
+    assertEquals(reassignment, zkClient.getPartitionReassignment)
+
+    val updatedReassingment = reassignment - TopicAndPartition("topic_b", 0)
+    zkClient.setOrCreatePartitionReassignment(updatedReassingment)
+    assertEquals(updatedReassingment, zkClient.getPartitionReassignment)
+
+    zkClient.deletePartitionReassignment()
+    assertEquals(Map.empty, zkClient.getPartitionReassignment)
+  }
+
+}


Mime
View raw message