kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-5647; Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand
Date Wed, 20 Dec 2017 20:19:42 GMT
KAFKA-5647; Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand

*  Use KafkaZkClient in ReassignPartitionsCommand
*  Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
*  Updated test classes to use new methods
*  All existing tests should pass

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #4260 from omkreddy/KAFKA-5647-ADMINCOMMANDS


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/488ea4b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/488ea4b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/488ea4b9

Branch: refs/heads/trunk
Commit: 488ea4b9fd797b5edd58c6f9685d70acd15e3e74
Parents: 35c1be7
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Wed Dec 20 12:19:36 2017 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Dec 20 12:19:36 2017 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  56 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala | 191 ++++++-------
 .../kafka/server/DynamicConfigManager.scala     |  17 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  15 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala |  31 ++-
 core/src/main/scala/kafka/zk/ZkData.scala       |   2 +-
 .../ReassignPartitionsIntegrationTest.scala     |   2 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   7 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  59 ++--
 .../unit/kafka/admin/DeleteTopicTest.scala      |  13 +-
 .../admin/ReassignPartitionsClusterTest.scala   | 124 ++++-----
 .../admin/ReassignPartitionsCommandTest.scala   | 274 ++++++++++---------
 .../kafka/admin/ReplicationQuotaUtils.scala     |  19 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala |  20 ++
 .../InternalTopicIntegrationTest.java           |  26 +-
 .../integration/utils/KafkaEmbedded.java        |  51 ++--
 17 files changed, 489 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a2b508e..18e76e7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -223,10 +223,10 @@
       <allow pkg="kafka.tools" />
       <allow pkg="kafka.utils" />
       <allow pkg="kafka.zk" />
+      <allow pkg="kafka.zookeeper" />
       <allow pkg="kafka.log" />
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
-      <allow pkg="org.I0Itec.zkclient" />
     </subpackage>
 
     <subpackage name="test">

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index c292fe6..e36b25b 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -18,12 +18,15 @@ package kafka.admin
 
 import joptsimple.OptionParser
 import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import kafka.common.AdminCommandFailedException
+import kafka.zk.KafkaZkClient
+import kafka.zookeeper.ZooKeeperClient
+
 import collection._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.zookeeper.KeeperException.NodeExistsException
 
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
@@ -51,20 +54,19 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
 
     val zkConnect = options.valueOf(zkConnectOpt)
-    var zkClient: ZkClient = null
-    var zkUtils: ZkUtils = null
+    var zooKeeperClient: ZooKeeperClient = null
+    var zkClient: KafkaZkClient = null
     try {
-      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
-      zkUtils = ZkUtils(zkConnect, 
-                        30000,
-                        30000,
-                        JaasUtils.isZkSecurityEnabled())
+      val time = Time.SYSTEM
+      zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
+      zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+
       val partitionsForPreferredReplicaElection =
         if (!options.has(jsonFileOpt))
-          zkUtils.getAllPartitions()
+          zkClient.getAllPartitions()
         else
           parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
-      val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
+      val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
     } catch {
@@ -77,7 +79,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
+  def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicPartition] = {
     Json.parseFull(jsonString) match {
       case Some(js) =>
         js.asJsonObject.get("partitions") match {
@@ -86,7 +88,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
             val partitions = partitionsRaw.map { p =>
               val topic = p("topic").to[String]
               val partition = p("partition").to[Int]
-              TopicAndPartition(topic, partition)
+              new TopicPartition(topic, partition)
             }.toBuffer
             val duplicatePartitions = CoreUtils.duplicates(partitions)
             if (duplicatePartitions.nonEmpty)
@@ -98,34 +100,30 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def writePreferredReplicaElectionData(zkUtils: ZkUtils,
-                                        partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
-    val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-    val jsonData = ZkUtils.preferredReplicaLeaderElectionZkData(partitionsUndergoingPreferredReplicaElection)
+  def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
+                                        partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) {
     try {
-      zkUtils.createPersistentPath(zkPath, jsonData)
-      println("Created preferred replica election path with %s".format(jsonData))
+      zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
+      println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
     } catch {
-      case _: ZkNodeExistsException =>
-        val partitionsUndergoingPreferredReplicaElection =
-          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
+      case _: NodeExistsException =>
         throw new AdminOperationException("Preferred replica leader election currently in progress for " +
-          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
+          "%s. Aborting operation".format(zkClient.getPreferredReplicaElection.mkString(",")))
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
 }
 
-class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitionsFromUser: scala.collection.Set[TopicAndPartition]) {
+class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) {
   def moveLeaderToPreferredReplica() = {
     try {
       val topics = partitionsFromUser.map(_.topic).toSet
-      val partitionsFromZk = zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) =>
-        partitions.map(TopicAndPartition(topic, _))
+      val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
+        partitions.map(new TopicPartition(topic, _))
       }.toSet
 
       val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains)
-      PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
+      PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
 
       println("Successfully started preferred replica election for partitions %s".format(validPartitions))
       invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 811e56e..6bcbe91 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -19,21 +19,25 @@ package kafka.admin
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import scala.collection._
-import scala.collection.JavaConverters._
+import joptsimple.OptionParser
+import kafka.common.AdminCommandFailedException
+import kafka.log.LogConfig
+import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils._
-import kafka.common.{AdminCommandFailedException, TopicAndPartition}
-import kafka.log.LogConfig
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.security.JaasUtils
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
+import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
-import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
-import LogConfig._
-import joptsimple.OptionParser
-import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.TopicPartition
+
+import scala.collection.JavaConverters._
+import scala.collection._
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -45,24 +49,24 @@ object ReassignPartitionsCommand extends Logging {
   def main(args: Array[String]): Unit = {
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
-    val zkUtils = ZkUtils(zkConnect,
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val time = Time.SYSTEM
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+
     val adminClientOpt = createAdminClient(opts)
 
     try {
       if(opts.options.has(opts.verifyOpt))
-        verifyAssignment(zkUtils, adminClientOpt, opts)
+        verifyAssignment(zkClient, adminClientOpt, opts)
       else if(opts.options.has(opts.generateOpt))
-        generateAssignment(zkUtils, opts)
+        generateAssignment(zkClient, opts)
       else if (opts.options.has(opts.executeOpt))
-        executeAssignment(zkUtils, adminClientOpt, opts)
+        executeAssignment(zkClient, adminClientOpt, opts)
     } catch {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
         println(Utils.stackTrace(e))
-    } finally zkUtils.close()
+    } finally zkClient.close()
   }
 
   private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[JAdminClient] = {
@@ -76,16 +80,17 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
+  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    verifyAssignment(zkUtils, adminClientOpt, jsonString)
+    verifyAssignment(zkClient, adminClientOpt, jsonString)
   }
 
-  def verifyAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], jsonString: String): Unit = {
+  def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], jsonString: String): Unit = {
     println("Status of partition reassignment: ")
+    val adminZkClient = new AdminZkClient(zkClient)
     val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(jsonString)
-    val reassignedPartitionsStatus = checkIfPartitionReassignmentSucceeded(zkUtils, partitionsToBeReassigned.toMap)
+    val reassignedPartitionsStatus = checkIfPartitionReassignmentSucceeded(zkClient, partitionsToBeReassigned.toMap)
     val replicasReassignmentStatus = checkIfReplicaReassignmentSucceeded(adminClientOpt, replicaAssignment)
 
     reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
@@ -109,28 +114,28 @@ object ReassignPartitionsCommand extends Logging {
           println("Reassignment of replica %s is still in progress".format(replica))
       }
     }
-
-    removeThrottle(zkUtils, reassignedPartitionsStatus, replicasReassignmentStatus)
+    removeThrottle(zkClient, reassignedPartitionsStatus, replicasReassignmentStatus, adminZkClient)
   }
 
-  private[admin] def removeThrottle(zkUtils: ZkUtils,
-                                    reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus],
+  private[admin] def removeThrottle(zkClient: KafkaZkClient,
+                                    reassignedPartitionsStatus: Map[TopicPartition, ReassignmentStatus],
                                     replicasReassignmentStatus: Map[TopicPartitionReplica, ReassignmentStatus],
-                                    admin: AdminUtilities = AdminUtils): Unit = {
+                                    adminZkClient: AdminZkClient): Unit = {
 
     //If both partition assignment and replica reassignment have completed remove both the inter-broker and replica-alter-dir throttle
     if (reassignedPartitionsStatus.forall { case (_, status) => status == ReassignmentCompleted } &&
         replicasReassignmentStatus.forall { case (_, status) => status == ReassignmentCompleted }) {
       var changed = false
+
       //Remove the throttle limit from all brokers in the cluster
       //(as we no longer know which specific brokers were involved in the move)
-      for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
-        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
+      for (brokerId <- zkClient.getAllBrokersInCluster.map(_.id)) {
+        val configs = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
         // bitwise OR as we don't want to short-circuit
         if (configs.remove(DynamicConfig.Broker.LeaderReplicationThrottledRateProp) != null
           | configs.remove(DynamicConfig.Broker.FollowerReplicationThrottledRateProp) != null
           | configs.remove(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) != null){
-          admin.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
+          adminZkClient.changeBrokerConfig(Seq(brokerId), configs)
           changed = true
         }
       }
@@ -138,11 +143,11 @@ object ReassignPartitionsCommand extends Logging {
       //Remove the list of throttled replicas from all topics with partitions being moved
       val topics = (reassignedPartitionsStatus.keySet.map(tp => tp.topic) ++ replicasReassignmentStatus.keySet.map(replica => replica.topic)).toSeq.distinct
       for (topic <- topics) {
-        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+        val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
         // bitwise OR as we don't want to short-circuit
         if (configs.remove(LogConfig.LeaderReplicationThrottledReplicasProp) != null
           | configs.remove(LogConfig.FollowerReplicationThrottledReplicasProp) != null) {
-          admin.changeTopicConfig(zkUtils, topic, configs)
+          adminZkClient.changeTopicConfig(topic, configs)
           changed = true
         }
       }
@@ -151,7 +156,7 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
+  def generateAssignment(zkClient: KafkaZkClient, opts: ReassignPartitionsCommandOptions) {
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
     val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
     val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
@@ -159,52 +164,54 @@ object ReassignPartitionsCommand extends Logging {
       throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val disableRackAware = opts.options.has(opts.disableRackAware)
-    val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
+    val (proposedAssignments, currentAssignments) = generateAssignment(zkClient, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
     println("Current partition replica assignment\n%s\n".format(formatAsReassignmentJson(currentAssignments, Map.empty)))
     println("Proposed partition reassignment configuration\n%s".format(formatAsReassignmentJson(proposedAssignments, Map.empty)))
   }
 
-  def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
+  def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = {
     val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
-    val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
+    val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet)
 
     val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
     val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
-    val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign))
+    val adminZkClient = new AdminZkClient(zkClient)
+    val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign))
 
-    val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]()
+    val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]()
     groupedByTopic.foreach { case (topic, assignment) =>
       val (_, replicas) = assignment.head
       val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
       partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
-        TopicAndPartition(topic, partition) -> replicas
+        new TopicPartition(topic, partition) -> replicas
       }
     }
     (partitionsToBeReassigned, currentAssignment)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
+  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
     val interBrokerThrottle = opts.options.valueOf(opts.interBrokerThrottleOpt)
     val replicaAlterLogDirsThrottle = opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt)
     val timeoutMs = opts.options.valueOf(opts.timeoutOpt)
-    executeAssignment(zkUtils, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
+    executeAssignment(zkClient, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
-    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkUtils, reassignmentJsonString)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, adminClientOpt, partitionAssignment.toMap, replicaAssignment)
+  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
+    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
+    val adminZkClient = new AdminZkClient(zkClient)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
 
     // If there is an existing rebalance running, attempt to change its throttle
-    if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
+    if (zkClient.reassignPartitionsInProgress()) {
       println("There is an existing assignment running.")
       reassignPartitionsCommand.maybeLimit(throttle)
     } else {
-      printCurrentAssignment(zkUtils, partitionAssignment.map(_._1.topic))
+      printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
       if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
         println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
       if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
@@ -214,31 +221,31 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def printCurrentAssignment(zkUtils: ZkUtils, topics: Seq[String]): Unit = {
+  def printCurrentAssignment(zkClient: KafkaZkClient, topics: Seq[String]): Unit = {
     // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(topics)
+    val currentPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(topics.toSet)
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
       .format(formatAsReassignmentJson(currentPartitionReplicaAssignment, Map.empty)))
   }
 
-  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
+  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
                                replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = {
     Json.encodeAsString(Map(
       "version" -> 1,
-      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
+      "partitions" -> partitionsToBeReassigned.map { case (tp, replicas) =>
         Map(
-          "topic" -> topic,
-          "partition" -> partition,
+          "topic" -> tp.topic,
+          "partition" -> tp.partition,
           "replicas" -> replicas.asJava,
-          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir)).asJava
+          "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(tp.topic, tp.partition, r), AnyLogDir)).asJava
         ).asJava
       }.asJava
     ).asJava)
   }
 
   // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
-  def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
-    val partitionAssignment = mutable.ListBuffer.empty[(TopicAndPartition, Seq[Int])]
+  def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, Seq[Int])]
     val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
     for {
       js <- Json.parseFull(jsonData).toSeq
@@ -255,8 +262,8 @@ object ReassignPartitionsCommand extends Logging {
       }
       if (newReplicas.size != newLogDirs.size)
         throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
-          s"size of log dirs list $newLogDirs for partition ${TopicAndPartition(topic, partition)}")
-      partitionAssignment += (TopicAndPartition(topic, partition) -> newReplicas)
+          s"size of log dirs list $newLogDirs for partition ${new TopicPartition(topic, partition)}")
+      partitionAssignment += (new TopicPartition(topic, partition) -> newReplicas)
       replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) =>
         new TopicPartitionReplica(topic, partition, replica) -> logDir
       }.filter(_._2 != AnyLogDir)
@@ -264,7 +271,7 @@ object ReassignPartitionsCommand extends Logging {
     (partitionAssignment, replicaAssignment)
   }
 
-  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): (Seq[(TopicAndPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+  def parseAndValidate(zkClient: KafkaZkClient, reassignmentJsonString: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
     val (partitionsToBeReassigned, replicaAssignment) = parsePartitionReassignmentData(reassignmentJsonString)
 
     if (partitionsToBeReassigned.isEmpty)
@@ -286,14 +293,14 @@ object ReassignPartitionsCommand extends Logging {
     }
     // check that all partitions in the proposed assignment exist in the cluster
     val proposedTopics = partitionsToBeReassigned.map { case (tp, _) => tp.topic }.distinct
-    val existingAssignment = zkUtils.getReplicaAssignmentForTopics(proposedTopics)
+    val existingAssignment = zkClient.getReplicaAssignmentForTopics(proposedTopics.toSet)
     val nonExistentPartitions = partitionsToBeReassigned.map { case (tp, _) => tp }.filterNot(existingAssignment.contains)
     if (nonExistentPartitions.nonEmpty)
       throw new AdminCommandFailedException("The proposed assignment contains non-existent partitions: " +
         nonExistentPartitions)
 
     // check that all brokers in the proposed assignment exist in the cluster
-    val existingBrokerIDs = zkUtils.getSortedBrokerList()
+    val existingBrokerIDs = zkClient.getSortedBrokerList
     val nonExistingBrokerIDs = partitionsToBeReassigned.toMap.values.flatten.filterNot(existingBrokerIDs.contains).toSet
     if (nonExistingBrokerIDs.nonEmpty)
       throw new AdminCommandFailedException("The proposed assignment contains non-existent brokerIDs: " + nonExistingBrokerIDs.mkString(","))
@@ -301,11 +308,12 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
-  :Map[TopicAndPartition, ReassignmentStatus] = {
-    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
+  private def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
+  :Map[TopicPartition, ReassignmentStatus] = {
+    val partitionsBeingReassigned = zkClient.getPartitionReassignment
+
     partitionsToBeReassigned.keys.map { topicAndPartition =>
-      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, partitionsToBeReassigned,
+      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, partitionsToBeReassigned,
         partitionsBeingReassigned))
     }.toMap
   }
@@ -351,15 +359,15 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
-                                            partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
-                                            partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
+  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, topicAndPartition: TopicPartition,
+                                            partitionsToBeReassigned: Map[TopicPartition, Seq[Int]],
+                                            partitionsBeingReassigned: Map[TopicPartition, Seq[Int]]): ReassignmentStatus = {
     val newReplicas = partitionsToBeReassigned(topicAndPartition)
     partitionsBeingReassigned.get(topicAndPartition) match {
       case Some(_) => ReassignmentInProgress
       case None =>
         // check if the current replica assignment matches the expected one after reassignment
-        val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
+        val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition))
         if(assignedReplicas == newReplicas)
           ReassignmentCompleted
         else {
@@ -458,23 +466,23 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils,
+class ReassignPartitionsCommand(zkClient: KafkaZkClient,
                                 adminClientOpt: Option[JAdminClient],
-                                proposedPartitionAssignment: Map[TopicAndPartition, Seq[Int]],
+                                proposedPartitionAssignment: Map[TopicPartition, Seq[Int]],
                                 proposedReplicaAssignment: Map[TopicPartitionReplica, String] = Map.empty,
-                                admin: AdminUtilities = AdminUtils)
+                                adminZkClient: AdminZkClient)
   extends Logging {
 
   import ReassignPartitionsCommand._
 
-  def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
+  def existingAssignment(): Map[TopicPartition, Seq[Int]] = {
     val proposedTopics = proposedPartitionAssignment.keySet.map(_.topic).toSeq
-    zkUtils.getReplicaAssignmentForTopics(proposedTopics)
+    zkClient.getReplicaAssignmentForTopics(proposedTopics.toSet)
   }
 
   private def maybeThrottle(throttle: Throttle): Unit = {
     if (throttle.interBrokerLimit >= 0)
-      assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment)
+      assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment, adminZkClient)
     maybeLimit(throttle)
     if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
       throttle.postUpdateAction()
@@ -495,7 +503,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
-        val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
+        val configs = adminZkClient.fetchEntityConfig(ConfigType.Broker, id.toString)
         if (throttle.interBrokerLimit >= 0) {
           configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
           configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
@@ -503,15 +511,15 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
         if (throttle.replicaAlterLogDirsLimit >= 0)
           configs.put(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, throttle.replicaAlterLogDirsLimit.toString)
 
-        admin.changeBrokerConfig(zkUtils, Seq(id), configs)
+        adminZkClient.changeBrokerConfig(Seq(id), configs)
       }
     }
   }
 
   /** Set throttles to replicas that are moving. Note: this method should only be used when the assignment is initiated. */
-  private[admin] def assignThrottledReplicas(existingPartitionAssignment: Map[TopicAndPartition, Seq[Int]],
-                                             proposedPartitionAssignment: Map[TopicAndPartition, Seq[Int]],
-                                             admin: AdminUtilities = AdminUtils): Unit = {
+  private[admin] def assignThrottledReplicas(existingPartitionAssignment: Map[TopicPartition, Seq[Int]],
+                                             proposedPartitionAssignment: Map[TopicPartition, Seq[Int]],
+                                             adminZkClient: AdminZkClient): Unit = {
     for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq) {
       val existingPartitionAssignmentForTopic = existingPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
       val proposedPartitionAssignmentForTopic = proposedPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
@@ -522,24 +530,24 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
       //Apply follower throttle to all "move destinations".
       val follower = format(postRebalanceReplicasThatMoved(existingPartitionAssignmentForTopic, proposedPartitionAssignmentForTopic))
 
-      val configs = admin.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+      val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
       configs.put(LeaderReplicationThrottledReplicasProp, leader)
       configs.put(FollowerReplicationThrottledReplicasProp, follower)
-      admin.changeTopicConfig(zkUtils, topic, configs)
+      adminZkClient.changeTopicConfig(topic, configs)
 
       debug(s"Updated leader-throttled replicas for topic $topic with: $leader")
       debug(s"Updated follower-throttled replicas for topic $topic with: $follower")
     }
   }
 
-  private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = {
+  private def postRebalanceReplicasThatMoved(existing: Map[TopicPartition, Seq[Int]], proposed: Map[TopicPartition, Seq[Int]]): Map[TopicPartition, Seq[Int]] = {
     //For each partition in the proposed list, filter out any replicas that exist now, and hence aren't being moved.
     proposed.map { case (tp, proposedReplicas) =>
       tp -> (proposedReplicas.toSet -- existing(tp)).toSeq
     }
   }
 
-  private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = {
+  private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicPartition, Seq[Int]], proposed: Map[TopicPartition, Seq[Int]]): Map[TopicPartition, Seq[Int]] = {
     def moving(before: Seq[Int], after: Seq[Int]) = (after.toSet -- before.toSet).nonEmpty
     //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader)
     existing.filter { case (tp, preMoveReplicas) =>
@@ -547,7 +555,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
     }
   }
 
-  def format(moves: Map[TopicAndPartition, Seq[Int]]): String =
+  def format(moves: Map[TopicPartition, Seq[Int]]): String =
     moves.flatMap { case (tp, moves) =>
       moves.map(replicaId => s"${tp.partition}:${replicaId}")
     }.mkString(",")
@@ -574,7 +582,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
   def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkClient, p.topic, p.partition) }
       if (validPartitions.isEmpty) false
       else {
         if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
@@ -586,8 +594,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
           alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs)
 
         // Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker
-        val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
-        zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+        zkClient.createPartitionReassignment(validPartitions.map({case (key, value) => (new TopicPartition(key.topic, key.partition), value)}).toMap)
 
         // Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory.
         // It may take some time for controller to create replica in the broker. Retry if the replica has not been created.
@@ -602,16 +609,16 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils,
         replicasAssignedToFutureDir.size == proposedReplicaAssignment.size
       }
     } catch {
-      case _: ZkNodeExistsException =>
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+      case _: NodeExistsException =>
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
         throw new AdminCommandFailedException("Partition reassignment currently in " +
           "progress for %s. Aborting operation".format(partitionsBeingReassigned))
     }
   }
 
-  def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
+  def validatePartition(zkClient: KafkaZkClient, topic: String, partition: Int): Boolean = {
     // check if partition exists
-    val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
+    val partitionsOpt = zkClient.getPartitionsForTopics(immutable.Set(topic)).get(topic)
     partitionsOpt match {
       case Some(partitions) =>
         if(partitions.contains(partition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 84bcfe3..728f88c 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -20,19 +20,16 @@ package kafka.server
 import java.nio.charset.StandardCharsets
 
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.utils.Json
-import kafka.utils.Logging
-import kafka.utils.ZkUtils
-
-import scala.collection._
-import scala.collection.JavaConverters._
-import kafka.admin.AdminUtils
+import kafka.utils.{Json, Logging}
 import kafka.utils.json.JsonObject
-import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zk.{KafkaZkClient, AdminZkClient, ConfigEntityChangeNotificationZNode, ConfigEntityChangeNotificationSequenceZNode}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
+import scala.collection._
+
 /**
  * Represents all the entities that can be configured via ZK
  */
@@ -155,8 +152,8 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
     }
   }
 
-  private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ZkUtils.ConfigChangesPath,
-    AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
+  private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
 
   /**
    * Begin watching for config changes

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 2c079e5..1f665e6 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -42,6 +42,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.TopicPartition
 
 object ZkUtils {
 
@@ -219,6 +220,18 @@ object ZkUtils {
     ))
   }
 
+  def getReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]]): String = {
+    Json.encodeAsString(Map(
+      "version" -> 1,
+      "partitions" -> partitionsToBeReassigned.map { case (tp, replicas) =>
+        Map(
+          "topic" -> tp.topic,
+          "partition" -> tp.partition,
+          "replicas" -> replicas
+        )
+      }
+    ))
+  }
 }
 
 class ZooKeeperClientWrapper(val zkClient: ZkClient) {
@@ -874,7 +887,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     // read the partitions and their new replica list
     val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1
     jsonPartitionListOpt match {
-      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
+      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList).map(tp => new TopicAndPartition(tp))
       case None => Set.empty[TopicAndPartition]
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d2bf881..13fd024 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -492,6 +492,21 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   }
 
   /**
+   * Gets all partitions in the cluster
+   * @return all partitions in the cluster
+   */
+  def getAllPartitions(): Set[TopicPartition] = {
+    val topics = getChildren(TopicsZNode.path)
+    if (topics == null) Set.empty
+    else {
+      topics.flatMap { topic =>
+        // The partitions path may not exist if the topic is in the process of being deleted
+        getChildren(TopicPartitionsZNode.path(topic)).map(_.toInt).map(new TopicPartition(topic, _))
+      }.toSet
+    }
+  }
+
+  /**
    * Gets the data and version at the given zk path
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as an array of bytes
@@ -667,13 +682,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   /**
    * Creates the partition reassignment znode with the given reassignment.
    * @param reassignment the reassignment to set on the reassignment znode.
-   * @throws KeeperException if there is an error while setting or creating the znode
+   * @throws KeeperException if there is an error while creating the znode
    */
   def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]])  = {
-    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
-      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
-    val createResponse = retryRequestUntilConnected(createRequest)
-    createResponse.maybeThrow
+    createRecursive(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment))
   }
 
   /**
@@ -794,6 +806,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   }
 
   /**
+   * Creates preferred replica election znode with partitions undergoing election
+   * @param partitions
+   * @throws KeeperException if there is an error while creating the znode
+   */
+  def createPreferredReplicaElection(partitions: Set[TopicPartition]): Unit  = {
+    createRecursive(PreferredReplicaElectionZNode.path, PreferredReplicaElectionZNode.encode(partitions))
+  }
+
+  /**
    * Gets the partitions marked for preferred replica election.
    * @return sequence of partitions.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 8688104..a03263c 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -165,7 +165,7 @@ object ConfigEntityZNode {
   def decode(bytes: Array[Byte]): Properties = {
     val props = new Properties()
     if (bytes != null) {
-      Json.parseBytes(bytes).map { js =>
+      Json.parseBytes(bytes).foreach { js =>
         val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
         configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 3f51528..3c0c537 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -35,7 +35,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
     kafka.admin.TopicCommand.createTopic(zkClient, createOpts)
 
     val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
-    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkClient,
       rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
 
     val assignment = proposedAssignment map { case (topicPartition, replicas) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 389cb8f..d411b2f 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -22,7 +22,6 @@ import javax.imageio.ImageIO
 
 import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
-import kafka.common.TopicAndPartition
 import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
@@ -136,10 +135,10 @@ object ReplicationQuotasTestRig {
       }
 
       println("Starting Reassignment")
-      val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
+      val newAssignment = ReassignPartitionsCommand.generateAssignment(zkClient, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkUtils, None, ZkUtils.formatAsReassignmentJson(newAssignment), Throttle(config.throttle))
+      ReassignPartitionsCommand.executeAssignment(zkClient, None, ZkUtils.getReassignmentJson(newAssignment), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()
@@ -167,7 +166,7 @@ object ReplicationQuotasTestRig {
     }
   }
 
-    def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicAndPartition, Seq[Int]]): Unit = {
+    def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicPartition, Seq[Int]]): Unit = {
       val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
 
       //Long stats

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index d1e758d..7a88237 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -29,7 +29,6 @@ import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
 import java.util
@@ -198,21 +197,21 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // create brokers
     servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
-    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
-    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     // in sync replicas should not have any replica that is not in the new assigned replicas
     checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
@@ -228,21 +227,21 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // create brokers
     servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
-    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
-    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
@@ -257,21 +256,21 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // create brokers
     servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
-    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas),  adminZkClient = adminZkClient)
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition,
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
-    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
@@ -287,10 +286,10 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
-    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
     assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
+    val reassignedPartitions = zkClient.getPartitionReassignment
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
   }
 
@@ -299,13 +298,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // put the partition in the reassigned path as well
     // reassign partition 0
     val newReplicas = Seq(0, 1)
     val partitionToBeReassigned = 0
-    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(topicAndPartition -> newReplicas))
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
     reassignPartitionsCommand.reassignPartitions()
     // create brokers
     servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
@@ -313,7 +312,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils),
                             "Partition reassignment should complete")
-    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
@@ -325,8 +324,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
   @Test
   def testPreferredReplicaJsonData() {
     // write preferred replica json data to zk path
-    val partitionsForPreferredReplicaElection = Set(TopicAndPartition("test", 1), TopicAndPartition("test2", 1))
-    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, partitionsForPreferredReplicaElection)
+    val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
+    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
     // try to read it back and compare with what was written
     val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
     val partitionsUndergoingPreferredReplicaElection =
@@ -345,12 +344,12 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None)
     // trigger preferred replica election
-    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition)))
+    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
     preferredReplicaElection.moveLeaderToPreferredReplica()
     val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader))
     assertEquals("Preferred replica election failed", preferredReplica, newLeader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 1922aaf..0d62afc 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -25,7 +25,7 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.{TopicAndPartition, TopicAlreadyMarkedForDeletionException}
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
@@ -127,14 +127,14 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // reassign partition 0
     val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None,
-      Map(new TopicAndPartition(topicPartition) -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None,
+      Map(topicPartition -> newReplicas),  adminZkClient = adminZkClient)
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, new TopicAndPartition(topicPartition),
-        Map(new TopicAndPartition(topicPartition) -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
+      val partitionsBeingReassigned = zkClient.getPartitionReassignment
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicPartition,
+        Map(topicPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
@@ -176,6 +176,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testAddPartitionDuringDeleteTopic() {
+    zkUtils.setupCommonPaths()
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
     val brokers = adminZkClient.getBrokerMetadatas()

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 865ab87..4968b57 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -16,7 +16,7 @@ import java.util.Collections
 import java.util.Properties
 
 import kafka.admin.ReassignPartitionsCommand._
-import kafka.common.{AdminCommandFailedException, TopicAndPartition}
+import kafka.common.AdminCommandFailedException
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
@@ -89,7 +89,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we move the replica on 100 to broker 101
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$expectedLogDir"]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
     waitForReassignmentToComplete()
 
     //Then the replica should be on 101
@@ -109,7 +109,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     // When we execute an assignment that moves an existing replica to another log directory on the same broker
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[100],"log_dirs":["$expectedLogDir"]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
     val replica = new TopicPartitionReplica(topicName, 0, 100)
     TestUtils.waitUntilTrue(() => {
       expectedLogDir == adminClient.describeReplicaLogDirs(Collections.singleton(replica)).all().get.get(replica).getCurrentReplicaLogDir
@@ -128,7 +128,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     ), servers = servers)
 
     //When rebalancing
-    val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
+    val newAssignment = generateAssignment(zkClient, brokers, json(topicName), true)._1
     // Find a partition in the new assignment on broker 102 and a random log directory on broker 102,
     // which currently does not have any partition for this topic
     val partition1 = newAssignment.find { case (_, brokerIds) => brokerIds.contains(102) }.get._1.partition
@@ -142,7 +142,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     // Generate a replica assignment to reassign replicas on broker 100 and 102 respectively to a random log directory on the same broker.
     // Before this reassignment, the replica already exists on broker 100 but does not exist on broker 102
     val newReplicaAssignment = Map(replica1 -> expectedLogDir1, replica2 -> expectedLogDir2)
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient),
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, newReplicaAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
@@ -170,8 +170,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     ), servers = servers)
 
     //When rebalancing
-    val newAssignment = generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+    val newAssignment = generateAssignment(zkClient, Array(100, 101), json(topicName), true)._1
+    ReassignPartitionsCommand.executeAssignment(zkClient, None,
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), NoThrottle)
     waitForReassignmentToComplete()
 
@@ -197,11 +197,11 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
       2 -> Seq(102, 100)
     ), servers = servers)
 
-    val proposed: Map[TopicAndPartition, Seq[Int]] = Map(
-      TopicAndPartition("topic1", 0) -> Seq(100, 102),
-      TopicAndPartition("topic1", 2) -> Seq(100, 102),
-      TopicAndPartition("topic2", 1) -> Seq(101, 100),
-      TopicAndPartition("topic2", 2) -> Seq(100, 102)
+    val proposed: Map[TopicPartition, Seq[Int]] = Map(
+      new TopicPartition("topic1", 0) -> Seq(100, 102),
+      new TopicPartition("topic1", 2) -> Seq(100, 102),
+      new TopicPartition("topic2", 1) -> Seq(101, 100),
+      new TopicPartition("topic2", 2) -> Seq(100, 102)
     )
 
     val replica1 = new TopicPartitionReplica("topic1", 0, 102)
@@ -212,7 +212,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     )
 
     //When rebalancing
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient),
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient),
       ReassignPartitionsCommand.formatAsReassignmentJson(proposed, proposedReplicaAssignment), NoThrottle)
     waitForReassignmentToComplete()
 
@@ -250,14 +250,14 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle.interBrokerLimit)
 
     //Start rebalance which will move replica on 100 -> replica on 102
-    val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+    val newAssignment = generateAssignment(zkClient, Array(101, 102), json(topicName), true)._1
 
     val start = System.currentTimeMillis()
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+    ReassignPartitionsCommand.executeAssignment(zkClient, None,
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), initialThrottle)
 
     //Check throttle config. Should be throttling replica 0 on 100 and 102 only.
-    checkThrottleConfigAddedToZK(initialThrottle.interBrokerLimit, servers, topicName, "0:100,0:101", "0:102")
+    checkThrottleConfigAddedToZK(adminZkClient, initialThrottle.interBrokerLimit, servers, topicName, Set("0:100","0:101"), Set("0:102"))
 
     //Await completion
     waitForReassignmentToComplete()
@@ -299,24 +299,24 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //Start rebalance
     val newAssignment = Map(
-      TopicAndPartition("topic1", 0) -> Seq(100, 102),//moved 101=>102
-      TopicAndPartition("topic1", 1) -> Seq(100, 102),//moved 101=>102
-      TopicAndPartition("topic2", 0) -> Seq(103, 105),//moved 104=>103
-      TopicAndPartition("topic2", 1) -> Seq(103, 105),//moved 104=>103
-      TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
-      TopicAndPartition("topic2", 2) -> Seq(103, 104)  //didn't move
+      new TopicPartition("topic1", 0) -> Seq(100, 102),//moved 101=>102
+      new TopicPartition("topic1", 1) -> Seq(100, 102),//moved 101=>102
+      new TopicPartition("topic2", 0) -> Seq(103, 105),//moved 104=>103
+      new TopicPartition("topic2", 1) -> Seq(103, 105),//moved 104=>103
+      new TopicPartition("topic1", 2) -> Seq(103, 104), //didn't move
+      new TopicPartition("topic2", 2) -> Seq(103, 104)  //didn't move
     )
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+    ReassignPartitionsCommand.executeAssignment(zkClient, None,
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(throttle))
 
     //Check throttle config. Should be throttling specific replicas for each topic.
-    checkThrottleConfigAddedToZK(throttle, servers, "topic1",
-      "1:100,1:101,0:100,0:101", //All replicas for moving partitions should be leader-throttled
-      "1:102,0:102" //Move destinations should be follower throttled.
+    checkThrottleConfigAddedToZK(adminZkClient, throttle, servers, "topic1",
+      Set("1:100","1:101","0:100","0:101"), //All replicas for moving partitions should be leader-throttled
+      Set("1:102","0:102") //Move destinations should be follower throttled.
     )
-    checkThrottleConfigAddedToZK(throttle, servers, "topic2",
-      "1:104,1:105,0:104,0:105", //All replicas for moving partitions should be leader-throttled
-      "1:103,0:103" //Move destinations should be follower throttled.
+    checkThrottleConfigAddedToZK(adminZkClient, throttle, servers, "topic2",
+      Set("1:104","1:105","0:104","0:105"), //All replicas for moving partitions should be leader-throttled
+      Set("1:103","0:103") //Move destinations should be follower throttled.
     )
   }
 
@@ -334,37 +334,37 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
 
     //Start rebalance
-    val newAssignment = generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+    val newAssignment = generateAssignment(zkClient, Array(101, 102), json(topicName), true)._1
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+    ReassignPartitionsCommand.executeAssignment(zkClient, None,
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(initialThrottle))
 
     //Check throttle config
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
+    checkThrottleConfigAddedToZK(adminZkClient, initialThrottle, servers, topicName, Set("0:100","0:101"), Set("0:102"))
 
     //Ensure that running Verify, whilst the command is executing, should have no effect
-    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
+    verifyAssignment(zkClient, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
 
     //Check throttle config again
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:101", "0:102")
+    checkThrottleConfigAddedToZK(adminZkClient, initialThrottle, servers, topicName, Set("0:100","0:101"), Set("0:102"))
 
     //Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
     val newThrottle = initialThrottle * 1000
 
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None,
+    ReassignPartitionsCommand.executeAssignment(zkClient, None,
       ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty), Throttle(newThrottle))
 
     //Check throttle was changed
-    checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:101", "0:102")
+    checkThrottleConfigAddedToZK(adminZkClient, newThrottle, servers, topicName, Set("0:100","0:101"), Set("0:102"))
 
     //Await completion
     waitForReassignmentToComplete()
 
     //Verify should remove the throttle
-    verifyAssignment(zkUtils, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
+    verifyAssignment(zkClient, None, ReassignPartitionsCommand.formatAsReassignmentJson(newAssignment, Map.empty))
 
     //Check removed
-    checkThrottleConfigRemovedFromZK(topicName, servers)
+    checkThrottleConfigRemovedFromZK(adminZkClient, topicName, servers)
 
     //Check move occurred
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
@@ -379,7 +379,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that includes an invalid partition (1:101 in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -390,7 +390,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that specifies an empty replica list (0: empty list in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -401,7 +401,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we execute an assignment that specifies an invalid brokerID (102: invalid broker ID in this case)
     val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101, 102]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, None, topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, None, topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -413,7 +413,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     // When we execute an assignment that specifies an invalid log directory
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["invalidDir"]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
   }
 
   @Test(expected = classOf[AdminCommandFailedException])
@@ -426,7 +426,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     // When we execute an assignment whose length of replicas doesn't match that of replicas
     val topicJson: String = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101],"log_dirs":["$logDir", "$logDir"]}]}"""
-    ReassignPartitionsCommand.executeAssignment(zkUtils, Some(adminClient), topicJson, NoThrottle)
+    ReassignPartitionsCommand.executeAssignment(zkClient, Some(adminClient), topicJson, NoThrottle)
   }
 
   @Test
@@ -443,14 +443,14 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //Define a move for some of them
     val move = Map(
-      TopicAndPartition("orders", 0) -> Seq(0, 2, 3),//moves
-      TopicAndPartition("orders", 1) -> Seq(0, 1, 2),//stays
-      TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving
-      TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor
+      new TopicPartition("orders", 0) -> Seq(0, 2, 3),//moves
+      new TopicPartition("orders", 1) -> Seq(0, 1, 2),//stays
+      new TopicPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving
+      new TopicPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor
     )
 
     //When we run a throttled reassignment
-    new ReassignPartitionsCommand(zkUtils, None, move).reassignPartitions(throttle)
+    new ReassignPartitionsCommand(zkClient, None, move, adminZkClient = adminZkClient).reassignPartitions(throttle)
 
     waitForReassignmentToComplete()
 
@@ -484,13 +484,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     createTopic(zkUtils, "customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3)), servers)
 
     val firstMove = Map(
-      TopicAndPartition("orders", 0) -> Seq(0, 2, 3), //moves
-      TopicAndPartition("orders", 1) -> Seq(0, 1, 2), //stays
-      TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving
-      TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor
+      new TopicPartition("orders", 0) -> Seq(0, 2, 3), //moves
+      new TopicPartition("orders", 1) -> Seq(0, 1, 2), //stays
+      new TopicPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving
+      new TopicPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor
     )
 
-    new ReassignPartitionsCommand(zkUtils, None, firstMove).reassignPartitions()
+    new ReassignPartitionsCommand(zkClient, None, firstMove, adminZkClient = adminZkClient).reassignPartitions()
     // Low pause to detect deletion of the reassign_partitions znode before the reassignment is complete
     waitForReassignmentToComplete(pause = 1L)
 
@@ -509,13 +509,13 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     // Define a move for some of them
     val secondMove = Map(
-      TopicAndPartition("orders", 0) -> Seq(0, 2, 3), // stays
-      TopicAndPartition("orders", 1) -> Seq(3, 1, 2), // moves
-      TopicAndPartition("payments", 1) -> Seq(2, 1), // changed preferred leader
-      TopicAndPartition("deliveries", 0) -> Seq(1, 2, 3) //increase replication factor
+      new TopicPartition("orders", 0) -> Seq(0, 2, 3), // stays
+      new TopicPartition("orders", 1) -> Seq(3, 1, 2), // moves
+      new TopicPartition("payments", 1) -> Seq(2, 1), // changed preferred leader
+      new TopicPartition("deliveries", 0) -> Seq(1, 2, 3) //increase replication factor
     )
 
-    new ReassignPartitionsCommand(zkUtils, None, secondMove).reassignPartitions()
+    new ReassignPartitionsCommand(zkClient, None, secondMove, adminZkClient = adminZkClient).reassignPartitions()
     // Low pause to detect deletion of the reassign_partitions znode before the reassignment is complete
     waitForReassignmentToComplete(pause = 1L)
 
@@ -535,16 +535,16 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     // We set the znode and then continuously attempt to set it again to exercise the case where the znode is set
     // immediately after deletion (i.e. before we set the watcher again)
 
-    val thirdMove = Map(TopicAndPartition("orders", 0) -> Seq(1, 2, 3))
+    val thirdMove = Map(new TopicPartition("orders", 0) -> Seq(1, 2, 3))
 
-    new ReassignPartitionsCommand(zkUtils, None, thirdMove).reassignPartitions()
+    new ReassignPartitionsCommand(zkClient, None, thirdMove, adminZkClient = adminZkClient).reassignPartitions()
 
-    val fourthMove = Map(TopicAndPartition("payments", 1) -> Seq(2, 3))
+    val fourthMove = Map(new TopicPartition("payments", 1) -> Seq(2, 3))
 
     // Continuously attempt to set the reassignment znode with `fourthMove` until it succeeds. It will only succeed
     // after `thirdMove` completes.
     Iterator.continually {
-      try new ReassignPartitionsCommand(zkUtils, None, fourthMove).reassignPartitions()
+      try new ReassignPartitionsCommand(zkClient, None, fourthMove, adminZkClient = adminZkClient).reassignPartitions()
       catch {
         case _: AdminCommandFailedException => false
       }


Mime
View raw message