kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1389; transient unit test failure in ProducerFailureHandlingTest; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Date Thu, 24 Apr 2014 00:26:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 93af67cd4 -> 0efdfa6f7


kafka-1389; transient unit test failure in ProducerFailureHandlingTest; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0efdfa6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0efdfa6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0efdfa6f

Branch: refs/heads/trunk
Commit: 0efdfa6f7b6be688d5c531e801c9d37882e55f7f
Parents: 93af67c
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Apr 23 17:26:46 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Apr 23 17:26:46 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   |  2 +-
 .../controller/ControllerChannelManager.scala   |  4 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |  2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    | 52 ++++++--------------
 .../test/scala/unit/kafka/admin/AdminTest.scala |  3 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |  5 +-
 .../ZookeeperConsumerConnectorTest.scala        | 21 ++++----
 .../kafka/integration/AutoOffsetResetTest.scala |  2 +-
 .../unit/kafka/integration/FetcherTest.scala    |  5 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  5 +-
 .../kafka/integration/RollingBounceTest.scala   | 36 ++------------
 .../kafka/integration/TopicMetadataTest.scala   | 15 ++----
 .../integration/UncleanLeaderElectionTest.scala |  6 +--
 .../unit/kafka/producer/AsyncProducerTest.scala |  2 +-
 .../unit/kafka/producer/ProducerTest.scala      | 25 +++-------
 .../unit/kafka/producer/SyncProducerTest.scala  |  3 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  9 +---
 .../scala/unit/kafka/server/LogOffsetTest.scala |  6 +--
 .../unit/kafka/server/LogRecoveryTest.scala     | 33 ++-----------
 .../unit/kafka/server/OffsetCommitTest.scala    | 11 ++---
 .../unit/kafka/server/ReplicaFetchTest.scala    |  4 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  6 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala | 30 ++++++++---
 23 files changed, 95 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 686a0df..bdc72ea 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -57,7 +57,7 @@ object TopicCommand {
       else if(opts.options.has(opts.describeOpt))
         describeTopic(zkClient, opts)
     } catch {
-      case e =>
+      case e: Throwable =>
         println("Error while executing topic command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c95c650..919aeb2 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -132,7 +132,7 @@ class RequestSendThread(val controllerId: Int,
             channel.send(request)
             isSendSuccessful = true
           } catch {
-            case e => // if the send was not successful, reconnect to broker and resend the message
+            case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
               error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                 RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e)
@@ -173,7 +173,7 @@ class RequestSendThread(val controllerId: Int,
       channel.connect()
       info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString()))
     } catch {
-      case e => {
+      case e: Throwable => {
         channel.disconnect()
         error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index a649461..91f0728 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -302,7 +302,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
             } else
               isMessageInAllReplicas = false
           } catch {
-            case t =>
+            case t: Throwable =>
               throw new RuntimeException("Error in processing replica %d in partition %s at offset %d."
               .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 440aed8..fcd5eee 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -63,33 +63,11 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     servers ++= List(server1, server2, server3, server4)
     brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
 
-    // create topics with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3,0,1)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3)))
-
-
-    // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId)
-    var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId)
-    var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId)
-
-    debug("Leader for " + topic1  + " is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
-
-    assertTrue("Leader should get elected", leader1.isDefined)
-    assertTrue("Leader should get elected", leader2.isDefined)
-    assertTrue("Leader should get elected", leader3.isDefined)
-    assertTrue("Leader should get elected", leader4.isDefined)
-
-    assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
-    assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
-    assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
-    assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+    // create topics first
+    createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+    createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers)
+    createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
   }
 
   override def tearDown() {
@@ -129,8 +107,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(leader2.get, leader2FromZk)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 5000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions",
       2000,0).topicsMetadata
     val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
@@ -154,8 +132,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(leader2.get, leader2FromZk)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 5000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas",
       2000,0).topicsMetadata
     val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
@@ -173,12 +151,12 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.addPartitions(zkClient, topic3, 7)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 5000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 5000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement",
       2000,0).topicsMetadata

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 8991050..3a6c5ff 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -310,8 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
 
     val controllerId = ZkUtils.getController(zkClient)
     val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 965099a..151ba7c 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -20,7 +20,6 @@ package kafka.consumer
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.Properties
 import scala.collection._
 import junit.framework.Assert._
 
@@ -28,7 +27,6 @@ import kafka.message._
 import kafka.server._
 import kafka.utils.TestUtils._
 import kafka.utils._
-import kafka.admin.AdminUtils
 import org.junit.Test
 import kafka.serializer._
 import kafka.cluster.{Broker, Cluster}
@@ -61,8 +59,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
 
   override def setUp() {
     super.setUp
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e93305a..40a25a2 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -25,7 +25,6 @@ import scala.collection._
 import org.scalatest.junit.JUnit3Suite
 import kafka.message._
 import kafka.serializer._
-import kafka.admin.AdminUtils
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
@@ -97,8 +96,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -176,8 +175,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -249,8 +248,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -275,8 +274,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
@@ -310,13 +309,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
 
     // create topic topic1 with 1 partition on broker 0
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 1415773..7125ec9 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -109,7 +109,7 @@ kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED
     for(i <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
 
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 9e1a3b7..4075068 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -30,7 +30,6 @@ import kafka.serializer._
 import kafka.producer.{KeyedMessage, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
-import kafka.admin.AdminUtils
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -55,8 +54,8 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
   override def setUp() {
     super.setUp
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)))
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopConnections()
     fetcher.startConnections(topicInfos, cluster)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 60a466f..6d489ad 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -209,9 +209,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
-    AdminUtils.createTopic(zkClient, newTopic, 1, 1)
-    TestUtils.waitUntilMetadataIsPropagated(servers, newTopic, 0, 1000)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0)
+    TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
+
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 3346156..5eee08a 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -21,12 +21,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
-import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager}
-import kafka.cluster.Broker
-import kafka.common.ErrorMapping
-import kafka.api._
-import kafka.admin.AdminUtils
+import kafka.utils.{Utils, TestUtils}
 import kafka.server.{KafkaConfig, KafkaServer}
 
 class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -80,31 +75,10 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topic4 = "new-topic4"
 
     // create topics with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, Map(0->Seq(1,2)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, Map(0->Seq(2,3)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, Map(0->Seq(0,3)))
-
-    // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId)
-    var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId)
-    var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId)
-
-    debug("Leader for " + topic1  + " is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1)))
-
-    assertTrue("Leader should get elected", leader1.isDefined)
-    assertTrue("Leader should get elected", leader2.isDefined)
-    assertTrue("Leader should get elected", leader3.isDefined)
-    assertTrue("Leader should get elected", leader4.isDefined)
-
-    assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
-    assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
-    assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
-    assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+    createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+    createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
+    createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
 
     // Do a rolling bounce and check if leader transitions happen correctly
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 761f759..35dc071 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -65,9 +65,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testBasicTopicMetadata {
     // create topic
     val topic = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
     assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
@@ -84,12 +83,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create topic
     val topic1 = "testGetAllTopicMetadata1"
     val topic2 = "testGetAllTopicMetadata2"
-    AdminUtils.createTopic(zkClient, topic1, 1, 1)
-    AdminUtils.createTopic(zkClient, topic2, 1, 1)
-
-    // wait for leader to be elected for both topics
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0, 1000)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic2, 0, 1000)
+    createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
 
     // issue metadata request with empty list of topics
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata",
@@ -120,7 +115,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // wait for leader to be elected
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
 
     // retry the metadata for the auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 1bf9462..1b11eb6 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -173,7 +173,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
     produceMessage(topic, "first")
-    waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000)
+    waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
@@ -208,7 +208,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
     produceMessage(topic, "first")
-    waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000)
+    waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
@@ -235,7 +235,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
 
     produceMessage(topic, "third")
-    waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000)
+    waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
 
     // verify clean leader transition to ISR follower

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index bdc6f01..6c3feac 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -169,7 +169,7 @@ class AsyncProducerTest extends JUnit3Suite {
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val broker1 = new Broker(0, "localhost", 9092)
     val broker2 = new Broker(1, "localhost", 9093)
-    broker1
+
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
     val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 439e33e..c1219a8 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -86,10 +86,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   @Test
   def testUpdateBrokerPartitionInfo() {
     val topic = "new-topic"
-    AdminUtils.createTopic(zkClient, topic, 1, 2)
-    // wait until the update metadata request for new topic reaches all servers
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
 
     val props1 = new util.Properties()
     props1.put("metadata.broker.list", "localhost:80,localhost:81")
@@ -152,9 +149,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topic with 1 partition and await leadership
-    AdminUtils.createTopic(zkClient, topic, 1, 2)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
 
     val producer1 = new Producer[String, String](producerConfig1)
     val producer2 = new Producer[String, String](producerConfig2)
@@ -183,7 +178,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       fail("Should have timed out for 3 acks.")
     }
     catch {
-      case se: FailedToSendMessageException => true
+      case se: FailedToSendMessageException =>
+        // this is expected
       case e: Throwable => fail("Not expected", e)
     }
     finally {
@@ -203,13 +199,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)))
-    // waiting for 1 partition is enough
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3)
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)),
+                          servers = servers)
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
@@ -266,9 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topics in ZK
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
     // do a simple test to make sure plumbing is okay
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 4840824..0dec9ec 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -92,8 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    AdminUtils.createTopic(zkClient, "test", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0)
+    TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
 
     val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 5136fbe..b278bb6 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
@@ -61,10 +60,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1)))
+    val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
     val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
@@ -108,10 +105,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0 -> Seq(0, 1)))
+    val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    // wait until leader is elected
-    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
     val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 76ae659..3fb08e6 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -22,7 +22,6 @@ import kafka.utils._
 import junit.framework.Assert._
 import java.util.{Random, Properties}
 import kafka.consumer.SimpleConsumer
-import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
@@ -30,8 +29,6 @@ import kafka.admin.AdminUtils
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.utils.nonthreadsafe
-import kafka.utils.threadsafe
 import org.junit.After
 import org.junit.Before
 import org.junit.Test
@@ -123,8 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
 
     // setup brokers in zookeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     var offsetChanged = false
     for(i <- 1 to 14) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index ddb2402..7a0ef6f 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 import java.io.File
-import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
@@ -72,13 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
-
-    // wait until leader is elected
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
-    assertTrue("Leader should get elected", leader.isDefined)
-    // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
     val numMessages = 2L
     sendMessages(numMessages.toInt)
@@ -105,14 +98,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
+    var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)(0)
 
-    // wait until leader is elected
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
-    assertTrue("Leader should get elected", leader.isDefined)
-    // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
-    
     assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     sendMessages(1)
@@ -169,13 +156,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1)))
+    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
-    // wait until leader is elected
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
-    assertTrue("Leader should get elected", leader.isDefined)
-    // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
     sendMessages(20)
     var hw = 20L
     // give some time for follower 1 to record leader HW of 600
@@ -202,13 +184,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId)))
-
-    // wait until leader is elected
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
-    assertTrue("Leader should get elected", leader.isDefined)
-    // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+    var leader = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(server1.config.brokerId, server2.config.brokerId)),
+                servers = servers)(0)
 
     sendMessages(2)
     var hw = 2L

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 90c21c6..19a8635 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -30,7 +30,6 @@ import kafka.utils.TestUtils._
 import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import scala.util.Random
 import scala.collection._
-import kafka.admin.AdminUtils
 
 class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   val random: Random = new Random()
@@ -78,9 +77,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition(topic, 0)
     val expectedReplicaAssignment = Map(0  -> List(1))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
+    createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server))
+
     val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
@@ -168,9 +166,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testLargeMetadataPayload() {
     val topicAndPartition = TopicAndPartition("large-metadata", 0)
     val expectedReplicaAssignment = Map(0  -> List(1))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicAndPartition.topic, expectedReplicaAssignment)
-    var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
+    createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment,
+                servers = Seq(server))
 
     val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(
       offset=42L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 5305167..481a400 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -22,7 +22,6 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
-import kafka.admin.AdminUtils
 import kafka.utils.TestUtils
 import junit.framework.Assert._
 import kafka.common._
@@ -51,8 +50,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
 
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
-      AdminUtils.createTopic(zkClient, topic, 1, 2)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+      createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
     }
 
     // send test messages to leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 1651822..addd11a 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -26,7 +26,6 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
 import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
-import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
 import kafka.utils.{TestUtils, Utils}
 
@@ -49,8 +48,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
 
     // create topic
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
@@ -69,7 +67,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     server.startup()
 
     // wait for the broker to receive the update metadata request after startup
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+    TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
     producer = new Producer[Int, String](new ProducerConfig(producerConfig))
     val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0efdfa6f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4bd5964..130b6be 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,8 +23,6 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.TimeUnit
 
 import collection.mutable.Map
 import collection.mutable.ListBuffer
@@ -150,15 +148,33 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a topic in zookeeper
+   * Create a topic in zookeeper.
+   * Wait until the leader is elected and the metadata is propagated to all brokers.
+   * Return the leader for each partition.
    */
   def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
-                  servers: List[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
+                  servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
     // create topic
     AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
     // wait until the update metadata request for new topic reaches all servers
     (0 until numPartitions).map { case i =>
-      TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, 500)
+      TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
+      i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
+    }.toMap
+  }
+
+  /**
+   * Create a topic in zookeeper using a customized replica assignment.
+   * Wait until the leader is elected and the metadata is propagated to all brokers.
+   * Return the leader for each partition.
+   */
+  def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+                  servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
+    // create topic
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment)
+    // wait until the update metadata request for new topic reaches all servers
+    partitionReplicaAssignment.keySet.map { case i =>
+      TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
       i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
     }.toMap
   }
@@ -553,8 +569,8 @@ object TestUtils extends Logging {
     byteBuffer
   }
 
-  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = {
-    assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
+  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = {
+    assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
       TestUtils.waitUntilTrue(() =>
         servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout))
   }


Mime
View raw message