kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: MINOR: Consolidate Topic create calls in Test classes
Date Thu, 19 Jul 2018 04:45:39 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c9a735  MINOR: Consolidate Topic create calls in Test classes
7c9a735 is described below

commit 7c9a7359dcf1a325c6eac8b5199048df05ad39e4
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Wed Jul 18 21:45:17 2018 -0700

    MINOR: Consolidate Topic create calls in Test classes
    
    - Replace adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK calls with TestUtils.createTopic wherever applicable
    - Replace adminZkClient.createTopic calls with TestUtils.createTopic wherever applicable
    - Move non-deprecated tests to other test classes and deprecate AdminTest.scala
    - Remove duplicate tests between AdminTest and AdminZkClientTest
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Dong Lin <lindong28@gmail.com>
    
    Closes #5303 from omkreddy/topiccreate
---
 .../unit/kafka/admin/AdminRackAwareTest.scala      |  33 ++
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 383 +--------------------
 .../kafka/admin/ConsumerGroupCommandTest.scala     |   2 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |  12 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala    |  10 +-
 .../PreferredReplicaElectionCommandTest.scala      |  68 ++++
 .../admin/ReassignPartitionsCommandTest.scala      | 154 ++++++++-
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala |  12 +-
 .../kafka/controller/ControllerFailoverTest.scala  |   2 +-
 .../controller/ControllerIntegrationTest.scala     |  50 ++-
 .../MetricsDuringTopicCreationDeletionTest.scala   |   2 +-
 .../integration/UncleanLeaderElectionTest.scala    |  12 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |   4 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   8 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   9 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  18 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  15 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  19 +-
 19 files changed, 372 insertions(+), 443 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
index ef85c6d..bece037 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -17,8 +17,10 @@
 package kafka.admin
 
 import kafka.utils.Logging
+import org.apache.kafka.common.errors.InvalidReplicationFactorException
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.Assertions._
 
 import scala.collection.Map
 
@@ -192,4 +194,35 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
       verifyRackAware = false, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
   }
+
+  @Test
+  def testReplicaAssignment() {
+    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
+
+    // test 0 replication factor
+    intercept[InvalidReplicationFactorException] {
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
+    }
+
+    // test wrong replication factor
+    intercept[InvalidReplicationFactorException] {
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
+    }
+
+    // correct assignment
+    val expectedAssignment = Map(
+        0 -> List(0, 1, 2),
+        1 -> List(1, 2, 3),
+        2 -> List(2, 3, 4),
+        3 -> List(3, 4, 0),
+        4 -> List(4, 0, 1),
+        5 -> List(0, 2, 3),
+        6 -> List(1, 3, 4),
+        7 -> List(2, 4, 0),
+        8 -> List(3, 0, 1),
+        9 -> List(4, 1, 2))
+
+    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
+    assertEquals(expectedAssignment, actualAssignment)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index e0e26a8..a1c317e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -16,9 +16,7 @@
  */
 package kafka.admin
 
-import kafka.server.DynamicConfig.Broker._
-import kafka.server.KafkaConfig._
-import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -26,24 +24,16 @@ import org.junit.{After, Before, Test}
 import java.util.Properties
 
 import kafka.utils._
-import kafka.log._
-import kafka.zk.{ConfigEntityZNode, PreferredReplicaElectionZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityZNode, ZooKeeperTestHarness}
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
-import java.io.File
-import java.util
-import java.util.concurrent.LinkedBlockingQueue
 
-import kafka.utils.TestUtils._
-
-import scala.collection.{Map, Set, immutable}
-import kafka.utils.CoreUtils._
-import org.apache.kafka.common.TopicPartition
+import scala.collection.{Map, immutable}
 import org.apache.kafka.common.security.JaasUtils
 
 import scala.collection.JavaConverters._
-import scala.util.Try
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "1.1.0")
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   var servers: Seq[KafkaServer] = Seq()
@@ -64,37 +54,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
   }
 
   @Test
-  def testReplicaAssignment() {
-    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
-
-    // test 0 replication factor
-    intercept[InvalidReplicationFactorException] {
-      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
-    }
-
-    // test wrong replication factor
-    intercept[InvalidReplicationFactorException] {
-      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
-    }
-
-    // correct assignment
-    val expectedAssignment = Map(
-        0 -> List(0, 1, 2),
-        1 -> List(1, 2, 3),
-        2 -> List(2, 3, 4),
-        3 -> List(3, 4, 0),
-        4 -> List(4, 0, 1),
-        5 -> List(0, 2, 3),
-        6 -> List(1, 3, 4),
-        7 -> List(2, 4, 0),
-        8 -> List(3, 0, 1),
-        9 -> List(4, 1, 2))
-
-    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
-    assertEquals(expectedAssignment, actualAssignment)
-  }
-
-  @Test
   def testManualReplicaAssignment() {
     val brokers = List(0, 1, 2, 3, 4)
     TestUtils.createBrokersInZk(zkClient, brokers)
@@ -193,340 +152,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     }
   }
 
-  private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = {
-    servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists)
-           .map(_.config.brokerId)
-           .toSet
-  }
-
-  @Test
-  def testPartitionReassignmentWithLeaderInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-    val topic = "test"
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
-    // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    // reassign partition 0
-    val newReplicas = Seq(0, 2, 3)
-    val partitionToBeReassigned = 0
-    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
-    assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
-      },
-      "Partition reassignment should complete")
-    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
-    // in sync replicas should not have any replica that is not in the new assigned replicas
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
-    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
-                            "New replicas should exist on brokers")
-  }
-
-  @Test
-  def testPartitionReassignmentWithLeaderNotInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-    val topic = "test"
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
-    // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    // reassign partition 0
-    val newReplicas = Seq(1, 2, 3)
-    val partitionToBeReassigned = 0
-    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
-    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
-      },
-      "Partition reassignment should complete")
-    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
-    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
-    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
-                            "New replicas should exist on brokers")
-  }
-
-  @Test
-  def testPartitionReassignmentNonOverlappingReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1))
-    val topic = "test"
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
-    // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    // reassign partition 0
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas),  adminZkClient = adminZkClient)
-    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
-      },
-      "Partition reassignment should complete")
-    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
-    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
-    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
-                            "New replicas should exist on brokers")
-  }
-
-  @Test
-  def testReassigningNonExistingPartition() {
-    val topic = "test"
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
-    // reassign partition 0
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
-    assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    val reassignedPartitions = zkClient.getPartitionReassignment
-    assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
-  }
-
-  @Test
-  def testResumePartitionReassignmentThatWasCompleted() {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1))
-    val topic = "test"
-    // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    // put the partition in the reassigned path as well
-    // reassign partition 0
-    val newReplicas = Seq(0, 1)
-    val partitionToBeReassigned = 0
-    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
-    reassignPartitionsCommand.reassignPartitions()
-    // create brokers
-    servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
-
-    // wait until reassignment completes
-    TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
-                            "Partition reassignment should complete")
-    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
-    assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    // ensure that there are no under replicated partitions
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
-    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
-                            "New replicas should exist on brokers")
-  }
-
-  @Test
-  def testPreferredReplicaJsonData() {
-    // write preferred replica json data to zk path
-    val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
-    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
-    // try to read it back and compare with what was written
-    val preferredReplicaElectionZkData = zkUtils.readData(PreferredReplicaElectionZNode.path)._1
-    val partitionsUndergoingPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
-    assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
-      partitionsUndergoingPreferredReplicaElection)
-  }
-
-  @Test
-  def testBasicPreferredReplicaElection() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
-    val topic = "test"
-    val partition = 1
-    val preferredReplica = 0
-    // create brokers
-    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
-    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
-    // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
-    // broker 2 should be the leader since it was started first
-    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
-    // trigger preferred replica election
-    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
-    preferredReplicaElection.moveLeaderToPreferredReplica()
-    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader))
-    assertEquals("Preferred replica election failed", preferredReplica, newLeader)
-  }
-
-  @Test
-  def testControlledShutdown() {
-    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
-    val topic = "test"
-    val partition = 1
-    // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
-    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
-    // create the topic
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
-
-    val controllerId = zkUtils.getController()
-    val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    val resultQueue = new LinkedBlockingQueue[Try[Set[TopicPartition]]]()
-    val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicPartition]]) => resultQueue.put(controlledShutdownResult)
-    controller.controlledShutdown(2, controlledShutdownCallback)
-    var partitionsRemaining = resultQueue.take().get
-    var activeServers = servers.filter(s => s.config.brokerId != 2)
-    // wait for the update metadata request to trickle to the brokers
-    TestUtils.waitUntilTrue(() =>
-      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
-      "Topic test not created after timeout")
-    assertEquals(0, partitionsRemaining.size)
-    var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-    var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
-    assertEquals(0, leaderAfterShutdown)
-    assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
-    assertEquals(List(0,1), partitionStateInfo.basePartitionState.isr.asScala)
-
-    controller.controlledShutdown(1, controlledShutdownCallback)
-    partitionsRemaining = resultQueue.take().get
-    assertEquals(0, partitionsRemaining.size)
-    activeServers = servers.filter(s => s.config.brokerId == 0)
-    partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-    leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
-    assertEquals(0, leaderAfterShutdown)
-
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
-    controller.controlledShutdown(0, controlledShutdownCallback)
-    partitionsRemaining = resultQueue.take().get
-    assertEquals(1, partitionsRemaining.size)
-    // leader doesn't change since all the replicas are shut down
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
-  }
-
-  /**
-   * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
-   * then changes the config and checks that the new values take effect.
-   */
-  @Test
-  def testTopicConfigChange() {
-    val partitions = 3
-    val topic = "my-topic"
-    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-    servers = Seq(server)
-
-    def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = {
-      val props = new Properties()
-      props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
-      props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
-      props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, throttledLeaders)
-      props.setProperty(LogConfig.FollowerReplicationThrottledReplicasProp, throttledFollowers)
-      props
-    }
-
-    def checkConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String, quotaManagerIsThrottled: Boolean) {
-      def checkList(actual: util.List[String], expected: String): Unit = {
-        assertNotNull(actual)
-        if (expected == "")
-          assertTrue(actual.isEmpty)
-        else
-          assertEquals(expected.split(",").toSeq, actual.asScala)
-      }
-      TestUtils.retry(10000) {
-        for (part <- 0 until partitions) {
-          val tp = new TopicPartition(topic, part)
-          val log = server.logManager.getLog(tp)
-          assertTrue(log.isDefined)
-          assertEquals(retentionMs, log.get.config.retentionMs)
-          assertEquals(messageSize, log.get.config.maxMessageSize)
-          checkList(log.get.config.LeaderReplicationThrottledReplicas, throttledLeaders)
-          checkList(log.get.config.FollowerReplicationThrottledReplicas, throttledFollowers)
-          assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(tp))
-        }
-      }
-    }
-
-    // create a topic with a few config overrides and check that they are applied
-    val maxMessageSize = 1024
-    val retentionMs = 1000 * 1000
-    adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-
-    //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
-    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
-
-    //Update dynamically and all properties should be applied
-    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-
-    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
-
-    // now double the config values for the topic and check that it is applied
-    val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
-    adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
-    checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
-
-    // Verify that the same config can be read from ZK
-    val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
-    assertEquals(newConfig, configInZk)
-
-    //Now delete the config
-    adminZkClient.changeTopicConfig(topic, new Properties)
-    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
-
-    //Add config back
-    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
-
-    //Now ensure updating to "" removes the throttled replica list also
-    adminZkClient.changeTopicConfig(topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
-    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  quotaManagerIsThrottled = false)
-  }
-
-  @Test
-  def shouldPropagateDynamicBrokerConfigs() {
-    val brokerIds = Seq(0, 1, 2)
-    servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
-
-    def checkConfig(limit: Long) {
-      retry(10000) {
-        for (server <- servers) {
-          assertEquals("Leader Quota Manager was not updated", limit, server.quotaManagers.leader.upperBound)
-          assertEquals("Follower Quota Manager was not updated", limit, server.quotaManagers.follower.upperBound)
-        }
-      }
-    }
-
-    val limit: Long = 1000000
-
-    // Set the limit & check it is applied to the log
-    adminZkClient.changeBrokerConfig(brokerIds, propsWith(
-      (LeaderReplicationThrottledRateProp, limit.toString),
-      (FollowerReplicationThrottledRateProp, limit.toString)))
-    checkConfig(limit)
-
-    // Now double the config values for the topic and check that it is applied
-    val newLimit = 2 * limit
-    adminZkClient.changeBrokerConfig(brokerIds,  propsWith(
-      (LeaderReplicationThrottledRateProp, newLimit.toString),
-      (FollowerReplicationThrottledRateProp, newLimit.toString)))
-    checkConfig(newLimit)
-
-    // Verify that the same config can be read from ZK
-    for (brokerId <- brokerIds) {
-      val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
-      assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
-      assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
-    }
-
-    //Now delete the config
-    adminZkClient.changeBrokerConfig(brokerIds, new Properties)
-    checkConfig(DefaultReplicationThrottledRate)
-  }
-
   /**
    * This test simulates a client config change in ZK whose notification has been purged.
    * Basically, it asserts that notifications are bootstrapped from ZK
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index cf00e93..072f29a 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -53,7 +53,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    adminZkClient.createTopic(topic, 1, 1)
+    createTopic(topic, 1, 1)
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 057814b..4d089b3 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -112,7 +112,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     this.servers = allServers
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -170,7 +170,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testIncreasePartitiovnCountDuringDeleteTopic() {
+  def testIncreasePartitionCountDuringDeleteTopic() {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
     val topicPartition = new TopicPartition(topic, 0)
@@ -181,7 +181,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     this.servers = allServers
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -282,9 +282,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // re-create topic on same replicas
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
-    // wait until leader is elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
     // check if all replica logs are created
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -374,7 +372,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment)
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index fac34a7..dce4cf9 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -410,7 +410,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
   def testDescribeWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val topic2 = "foo2"
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic2, 2, 1)
 
     for (describeType <- describeTypes) {
       val group = this.group + describeType.mkString("")
@@ -431,7 +431,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
   def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val topic2 = "foo2"
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
     addConsumerGroupExecutor(numConsumers = 2, topic2)
@@ -453,7 +453,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
   def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val topic2 = "foo2"
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
     addConsumerGroupExecutor(numConsumers = 2, topic2)
@@ -479,7 +479,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
   def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkClient, servers)
     val topic2 = "foo2"
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
     addConsumerGroupExecutor(numConsumers = 2, topic2)
@@ -499,7 +499,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
 
     TestUtils.createOffsetsTopic(zkClient, servers)
     val topic2 = "foo2"
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic2, 2, 1)
     addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
new file mode 100644
index 0000000..913b372
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.junit.Assert.assertEquals
+import org.junit.{After, Test}
+
+import scala.collection.{Map, Set}
+
+class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logging {
+  var servers: Seq[KafkaServer] = Seq()
+
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
+  @Test
+  def testPreferredReplicaJsonData() {
+    // write preferred replica json data to zk path
+    val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
+    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
+    // try to read it back and compare with what was written
+    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
+    assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
+      partitionsUndergoingPreferredReplicaElection)
+  }
+
+  @Test
+  def testBasicPreferredReplicaElection() {
+    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val topic = "test"
+    val partition = 1
+    val preferredReplica = 0
+    // create brokers
+    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
+    // create the topic
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+    // broker 2 should be the leader since it was started first
+    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
+    // trigger preferred replica election
+    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
+    preferredReplicaElection.moveLeaderToPreferredReplica()
+    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader))
+    assertEquals("Preferred replica election failed", preferredReplica, newLeader)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 48a30ad..6978f8d 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -16,28 +16,37 @@
  */
 package kafka.admin
 
+import java.io.File
 import java.util.Properties
 
 import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.log.LogConfig
 import kafka.log.LogConfig._
-import kafka.server.{ConfigType, DynamicConfig}
+import kafka.server.{ConfigType, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.CoreUtils._
 import kafka.utils.TestUtils._
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, EasyMock}
-import org.junit.{Before, Test}
-import org.junit.Assert.{assertEquals, assertNull}
+import org.junit.{After, Before, Test}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 
-import scala.collection.{Seq, mutable}
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
+import scala.collection.mutable
+
 class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
+  var servers: Seq[KafkaServer] = Seq()
   var calls = 0
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
   def shouldFindMovingReplicas() {
     val control = new TopicPartition("topic1", 1) -> Seq(100, 102)
@@ -414,6 +423,143 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
     assertEquals(2, propsCapture.getValues.size) //2 topics
   }
 
+  @Test
+  def testPartitionReassignmentWithLeaderInNewReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
+    val topic = "test"
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    // create the topic
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
+    // reassign partition 0
+    val newReplicas = Seq(0, 2, 3)
+    val partitionToBeReassigned = 0
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
+    assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+      },
+      "Partition reassignment should complete")
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
+    // in sync replicas should not have any replica that is not in the new assigned replicas
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
+  }
+
+  @Test
+  def testPartitionReassignmentWithLeaderNotInNewReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
+    val topic = "test"
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    // create the topic
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
+    // reassign partition 0
+    val newReplicas = Seq(1, 2, 3)
+    val partitionToBeReassigned = 0
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
+    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+      },
+      "Partition reassignment should complete")
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
+    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
+  }
+
+  @Test
+  def testPartitionReassignmentNonOverlappingReplicas() {
+    val expectedReplicaAssignment = Map(0  -> List(0, 1))
+    val topic = "test"
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    // create the topic
+    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
+    // reassign partition 0
+    val newReplicas = Seq(2, 3)
+    val partitionToBeReassigned = 0
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas),  adminZkClient = adminZkClient)
+    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+        val partitionsBeingReassigned = zkClient.getPartitionReassignment
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
+          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted
+      },
+      "Partition reassignment should complete")
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
+    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
+  }
+
+  @Test
+  def testReassigningNonExistingPartition() {
+    val topic = "test"
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    // reassign partition 0
+    val newReplicas = Seq(2, 3)
+    val partitionToBeReassigned = 0
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
+    assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
+    val reassignedPartitions = zkClient.getPartitionReassignment
+    assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
+  }
+
+  @Test
+  def testResumePartitionReassignmentThatWasCompleted() {
+    val expectedReplicaAssignment = Map(0  -> List(0, 1))
+    val topic = "test"
+    // create the topic
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    // put the partition in the reassigned path as well
+    // reassign partition 0
+    val newReplicas = Seq(0, 1)
+    val partitionToBeReassigned = 0
+    val topicAndPartition = new TopicPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, None, Map(topicAndPartition -> newReplicas), adminZkClient = adminZkClient)
+    reassignPartitionsCommand.reassignPartitions()
+    // create brokers
+    servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+
+    // wait until reassignment completes
+    TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
+                            "Partition reassignment should complete")
+    val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
+    assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    // ensure that there are no under replicated partitions
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
+                            "New replicas should exist on brokers")
+  }
+
+  private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = {
+    servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists)
+           .map(_.config.brokerId)
+           .toSet
+  }
+
   //Override eq as is for brevity
   def is[T](v: T): T = EasyMock.eq(v)
 
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 3644cd7..eaa0c85 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -232,7 +232,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   @Test
   def testResetOffsetsToEarliestOnOneTopicAndPartition() {
     val topic = "bar"
-    adminZkClient.createTopic(topic, 2, 1)
+    createTopic(topic, 2, 1)
 
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic",
       s"$topic:1", "--to-earliest", "--execute")
@@ -253,8 +253,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   def testResetOffsetsToEarliestOnTopics() {
     val topic1 = "topic1"
     val topic2 = "topic2"
-    adminZkClient.createTopic(topic1, 1, 1)
-    adminZkClient.createTopic(topic2, 1, 1)
+    createTopic(topic1, 1, 1)
+    createTopic(topic2, 1, 1)
 
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1,
       "--topic", topic2, "--to-earliest", "--execute")
@@ -280,8 +280,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val topic1 = "topic1"
     val topic2 = "topic2"
 
-    adminZkClient.createTopic(topic1, 2, 1)
-    adminZkClient.createTopic(topic2, 2, 1)
+    createTopic(topic1, 2, 1)
+    createTopic(topic2, 2, 1)
 
     val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic",
       s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
@@ -310,7 +310,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val topic = "bar"
     val tp0 = new TopicPartition(topic, 0)
     val tp1 = new TopicPartition(topic, 1)
-    adminZkClient.createTopic(topic, 2, 1)
+    createTopic(topic, 2, 1)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
       "--to-offset", "2", "--export")
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 32e23cc..6cfa72c 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -59,7 +59,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     }
     val initialEpoch = initialController.epoch
     // Create topic with one partition
-    adminZkClient.createTopic(topic, 1, 1)
+    createTopic(topic, 1, 1)
     val topicPartition = new TopicPartition("topic1", 0)
     TestUtils.waitUntilTrue(() =>
       initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 88fe82b..5e5d84f 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.controller
 
+import java.util.concurrent.LinkedBlockingQueue
+
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
@@ -24,10 +26,11 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
 import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness}
 import org.junit.{After, Before, Test}
-import org.junit.Assert.assertTrue
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
   var servers = Seq.empty[KafkaServer]
@@ -293,6 +296,51 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     }, "failed to get expected partition state after entire isr went offline")
   }
 
+  @Test
+  def testControlledShutdown() {
+    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
+    val topic = "test"
+    val partition = 1
+    // create brokers
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
+    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+    // create the topic
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
+
+    val controllerId = zkClient.getControllerId.get
+    val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    val resultQueue = new LinkedBlockingQueue[Try[collection.Set[TopicPartition]]]()
+    val controlledShutdownCallback = (controlledShutdownResult: Try[collection.Set[TopicPartition]]) => resultQueue.put(controlledShutdownResult)
+    controller.controlledShutdown(2, controlledShutdownCallback)
+    var partitionsRemaining = resultQueue.take().get
+    var activeServers = servers.filter(s => s.config.brokerId != 2)
+    // wait for the update metadata request to trickle to the brokers
+    TestUtils.waitUntilTrue(() =>
+      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
+      "Topic test not created after timeout")
+    assertEquals(0, partitionsRemaining.size)
+    var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
+    assertEquals(0, leaderAfterShutdown)
+    assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
+    assertEquals(List(0,1), partitionStateInfo.basePartitionState.isr.asScala)
+
+    controller.controlledShutdown(1, controlledShutdownCallback)
+    partitionsRemaining = resultQueue.take().get
+    assertEquals(0, partitionsRemaining.size)
+    activeServers = servers.filter(s => s.config.brokerId == 0)
+    partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
+    assertEquals(0, leaderAfterShutdown)
+
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+    controller.controlledShutdown(0, controlledShutdownCallback)
+    partitionsRemaining = resultQueue.take().get
+    assertEquals(1, partitionsRemaining.size)
+    // leader doesn't change since all the replicas are shut down
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+  }
+
   private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicPartition,
                                              replicas: Set[Int], leaderEpoch: Int): Unit = {
     otherBroker.shutdown()
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index daa4276..2fcc724 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -135,7 +135,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
       // Create topics
       for (t <- topics if running) {
         try {
-          adminZkClient.createTopic(t, partitionNum, replicationFactor)
+          createTopic(t, partitionNum, replicationFactor)
         } catch {
           case e: Exception => e.printStackTrace
         }
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 040bee3..ebec1d3 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -106,7 +106,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers)
 
     verifyUncleanLeaderElectionEnabled
   }
@@ -118,7 +118,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers)
 
     verifyUncleanLeaderElectionDisabled
   }
@@ -133,8 +133,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "true")
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
-      topicProps)
+    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps)
 
     verifyUncleanLeaderElectionEnabled
   }
@@ -150,8 +149,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "false")
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
-      topicProps)
+    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps)
 
     verifyUncleanLeaderElectionDisabled
   }
@@ -165,7 +163,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     topicProps.put("unclean.leader.election.enable", "invalid")
 
     intercept[ConfigException] {
-      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+      TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1)), servers, topicProps)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 2423e4c..e67fad1 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -49,7 +49,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetricsReporterAfterDeletingTopic() {
     val topic = "test-topic-metric"
-    adminZkClient.createTopic(topic, 1, 1)
+    createTopic(topic, 1, 1)
     adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
@@ -58,7 +58,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
-    adminZkClient.createTopic(topic, 2, 1)
+    createTopic(topic, 2, 1)
     // Produce a few messages to create the metrics
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
     TestUtils.generateAndProduceMessages(servers, topic, nMessages)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 7ad44cb..f5c5c9b 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -46,7 +46,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val tp = new TopicPartition("test", 0)
     val logProps = new Properties()
     logProps.put(FlushMessagesProp, oldVal.toString)
-    adminZkClient.createTopic(tp.topic, 1, 1, logProps)
+    createTopic(tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index dd4f7e3..e371f7f 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -65,7 +65,7 @@ class LogOffsetTest extends BaseRequestTest {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
 
-    adminZkClient.createTopic(topic, 1, 1)
+    createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
@@ -97,7 +97,7 @@ class LogOffsetTest extends BaseRequestTest {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
 
-    adminZkClient.createTopic(topic, 1, 1)
+    createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
@@ -157,7 +157,7 @@ class LogOffsetTest extends BaseRequestTest {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
-    adminZkClient.createTopic(topic, 3, 1)
+    createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
@@ -186,7 +186,7 @@ class LogOffsetTest extends BaseRequestTest {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
-    adminZkClient.createTopic(topic, 3, 1)
+    createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 3fac95c..0bbe637 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -24,11 +24,13 @@ import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
 import kafka.utils.TestUtils._
 import kafka.utils.CoreUtils._
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{After, Test}
+
 import scala.collection.JavaConverters._
 
 /**
@@ -78,7 +80,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
     //And two extra partitions 6,7, which we don't intend on throttling.
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
+    val assignment = Map(
       0 -> Seq(100, 106), //Throttled
       1 -> Seq(101, 106), //Throttled
       2 -> Seq(102, 106), //Throttled
@@ -87,7 +89,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
       5 -> Seq(105, 107), //Throttled
       6 -> Seq(100, 106), //Not Throttled
       7 -> Seq(101, 107) //Not Throttled
-    ))
+    )
+    TestUtils.createTopic(zkClient, topic, assignment, brokers)
 
     val msg = msg100KB
     val msgCount = 100
@@ -176,7 +179,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val config: Properties = createBrokerConfig(100, zkConnect)
     config.put("log.segment.bytes", (1024 * 1024).toString)
     brokers = Seq(createServer(fromProps(config)))
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
 
     //Write 20MBs and throttle at 5MB/s
     val msg = msg100KB
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 4c11e6f..3dcf4ff 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -78,7 +78,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
     producer = createProducer
     val tp = new TopicPartition(topic, 0)
 
@@ -139,9 +139,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     //A single partition topic with 2 replicas
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
-      0 -> Seq(100, 101)
-    ))
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
     producer = createProducer
 
     //Write 10 messages
@@ -189,9 +187,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
-      0 -> Seq(100, 101)
-    ))
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
     producer = createBufferingProducer
 
     //Write 100 messages
@@ -266,7 +262,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers)
     producer = createProducer
 
     //Kick off with a single record
@@ -306,9 +302,9 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_, enableUncleanLeaderElection = true))
 
     // A single partition topic with 2 replicas, min.isr = 1
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(
-      topic, Map(0 -> Seq(100, 101)), config = CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1"))
-    )
+    TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers,
+      CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1")))
+
     producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
 
     // Write one message while both brokers are up
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 36c6ab5..4b5a092 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -96,13 +96,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
     brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic1, Map(
-      0 -> Seq(100),
-      1 -> Seq(101)
-    ))
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic2, Map(
-      0 -> Seq(100)
-    ))
+
+    val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
+    TestUtils.createTopic(zkClient, topic1, assignment1, brokers)
+
+    val assignment2 = Map(0 -> Seq(100))
+    TestUtils.createTopic(zkClient, topic2, assignment2, brokers)
 
     //Send messages equally to the two partitions, then half as many to a third
     producer = createProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
@@ -144,7 +143,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     //Setup: we are only interested in the single partition on broker 101
     brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, Map(tp.partition -> Seq(101)))
+    TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
     producer = createProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
 
     //1. Given a single message
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 21205ed..f89abb9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -305,11 +305,26 @@ object TestUtils extends Logging {
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(zkClient: KafkaZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+  def createTopic(zkClient: KafkaZkClient,
+                  topic: String,
+                  partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
                   servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = {
+    createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties())
+  }
+
+  /**
+   * Create a topic in ZooKeeper using a customized replica assignment.
+   * Wait until the leader is elected and the metadata is propagated to all brokers.
+   * Return the leader for each partition.
+   */
+  def createTopic(zkClient: KafkaZkClient,
+                  topic: String,
+                  partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+                  servers: Seq[KafkaServer],
+                  topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, topicConfig)
     // wait until the update metadata request for new topic reaches all servers
     partitionReplicaAssignment.keySet.map { case i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)


Mime
View raw message