kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [2/2] kafka git commit: KAFKA-1501 Let the OS choose the port in unit tests to avoid collisions. Patch by Ewen CP, reviewed by Guozhang and me.
Date Sat, 04 Apr 2015 21:43:02 GMT
KAFKA-1501 Let the OS choose the port in unit tests to avoid collisions. Patch by Ewen CP, reviewed by Guozhang and me.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6adaffd8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6adaffd8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6adaffd8

Branch: refs/heads/trunk
Commit: 6adaffd8ea4bc1f40bd5cf5268c30eb2df1868ab
Parents: 15b93a4
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Sat Apr 4 14:26:38 2015 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Sat Apr 4 14:26:38 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/network/SelectorTest.java      |   9 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  28 ----
 .../main/scala/kafka/network/SocketServer.scala |  12 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   5 +-
 .../kafka/api/ConsumerBounceTest.scala          | 154 +++++++++++++++++
 .../integration/kafka/api/ConsumerTest.scala    |  83 ----------
 .../kafka/api/FixedPortTestUtils.scala          |  55 +++++++
 .../kafka/api/IntegrationTestHarness.scala      |  13 +-
 .../kafka/api/ProducerBounceTest.scala          | 164 +++++++++++++++++++
 .../kafka/api/ProducerCompressionTest.scala     |  14 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  89 ++--------
 .../kafka/api/ProducerSendTest.scala            |  12 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  27 +--
 .../test/scala/unit/kafka/admin/AdminTest.scala |  16 +-
 .../kafka/admin/DeleteConsumerGroupTest.scala   |   4 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   6 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |  29 ++--
 .../ZookeeperConsumerConnectorTest.scala        |  47 +++---
 .../kafka/integration/AutoOffsetResetTest.scala |   4 +-
 .../unit/kafka/integration/FetcherTest.scala    |  28 ++--
 .../integration/KafkaServerTestHarness.scala    |  24 ++-
 .../kafka/integration/PrimitiveApiTest.scala    |  12 +-
 .../ProducerConsumerTestHarness.scala           |   5 +-
 .../kafka/integration/RollingBounceTest.scala   |  29 +---
 .../kafka/integration/TopicMetadataTest.scala   |   7 +-
 .../integration/UncleanLeaderElectionTest.scala |  25 +--
 .../ZookeeperConsumerConnectorTest.scala        |  11 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   2 +-
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    |  11 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   9 +-
 .../unit/kafka/network/SocketServerTest.scala   |   6 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  24 +--
 .../unit/kafka/producer/ProducerTest.scala      |  55 ++++---
 .../unit/kafka/producer/SyncProducerTest.scala  |  19 +--
 .../unit/kafka/server/AdvertiseBrokerTest.scala |   2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |   3 +-
 .../server/HighwatermarkPersistenceTest.scala   |   2 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  36 ++--
 .../unit/kafka/server/LeaderElectionTest.scala  |  13 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   9 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |  36 ++--
 .../unit/kafka/server/OffsetCommitTest.scala    |   5 +-
 .../unit/kafka/server/ReplicaFetchTest.scala    |  11 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |   6 +-
 .../server/ServerGenerateBrokerIdTest.scala     |  19 ++-
 .../unit/kafka/server/ServerShutdownTest.scala  |  21 ++-
 .../unit/kafka/server/ServerStartupTest.scala   |   6 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   2 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  51 +++---
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |   6 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   |   2 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  11 +-
 54 files changed, 732 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 0d030bc..d5b306b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -131,9 +131,12 @@ public class SelectorTest {
     @Test
     public void testConnectionRefused() throws Exception {
         int node = 0;
-        selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
+        ServerSocket nonListeningSocket = new ServerSocket(0);
+        int nonListeningPort = nonListeningSocket.getLocalPort();
+        selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
         while (selector.disconnected().contains(node))
             selector.poll(1000L);
+        nonListeningSocket.close();
     }
 
     /**
@@ -271,8 +274,8 @@ public class SelectorTest {
         private final List<Socket> sockets;
 
         public EchoServer() throws Exception {
-            this.port = TestUtils.choosePort();
-            this.serverSocket = new ServerSocket(port);
+            this.serverSocket = new ServerSocket(0);
+            this.port = this.serverSocket.getLocalPort();
             this.threads = Collections.synchronizedList(new ArrayList<Thread>());
             this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 20dba7b..ccf3a5f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -19,8 +19,6 @@ package org.apache.kafka.test;
 import static java.util.Arrays.asList;
 
 import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -60,32 +58,6 @@ public class TestUtils {
     }
 
     /**
-     * Choose a number of random available ports
-     */
-    public static int[] choosePorts(int count) {
-        try {
-            ServerSocket[] sockets = new ServerSocket[count];
-            int[] ports = new int[count];
-            for (int i = 0; i < count; i++) {
-                sockets[i] = new ServerSocket(0);
-                ports[i] = sockets[i].getLocalPort();
-            }
-            for (int i = 0; i < count; i++)
-                sockets[i].close();
-            return ports;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Choose an available port
-     */
-    public static int choosePort() {
-        return choosePorts(1)[0];
-    }
-
-    /**
      * Generate an array of random bytes
      * 
      * @param size The size of the array

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 76ce41a..07ce58e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -39,7 +39,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
  */
 class SocketServer(val brokerId: Int,
                    val host: String,
-                   val port: Int,
+                   private val port: Int,
                    val numProcessorThreads: Int,
                    val maxQueuedRequests: Int,
                    val sendBufferSize: Int,
@@ -72,7 +72,7 @@ class SocketServer(val brokerId: Int,
                                     requestChannel,
                                     quotas,
                                     connectionsMaxIdleMs)
-      Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
+      Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
     }
 
     newGauge("ResponsesBeingSent", new Gauge[Int] {
@@ -100,6 +100,12 @@ class SocketServer(val brokerId: Int,
       processor.shutdown()
     info("Shutdown completed")
   }
+
+  def boundPort(): Int = {
+    if (acceptor == null)
+      throw new KafkaException("Tried to check server's port before server was started")
+    acceptor.serverChannel.socket().getLocalPort
+  }
 }
 
 /**
@@ -197,7 +203,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
  * Thread that accepts and configures new connections. There is only need for one of these
  */
 private[kafka] class Acceptor(val host: String, 
-                              val port: Int, 
+                              private val port: Int,
                               private val processors: Array[Processor],
                               val sendBufferSize: Int, 
                               val recvBufferSize: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4db3384..10ea77a 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -157,7 +157,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         topicConfigManager.startup()
 
         /* tell everyone we are alive */
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+        val advertisedPort = if (config.advertisedPort != 0) config.advertisedPort else socketServer.boundPort()
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, advertisedPort, config.zkSessionTimeoutMs, zkClient)
         kafkaHealthcheck.startup()
 
         /* register broker metrics */
@@ -357,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   def getLogManager(): LogManager = logManager
 
+  def boundPort(): Int = socketServer.boundPort()
+
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
                                      segmentMs = config.logRollTimeMillis,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
new file mode 100644
index 0000000..35f4f46
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -0,0 +1,154 @@
+/**
+ * Copyright 2015 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package kafka.api
+
+import kafka.server.KafkaConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.TopicPartition
+
+import kafka.utils.{ShutdownableThread, TestUtils, Logging}
+
+import org.junit.Assert._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Integration tests for the new consumer that cover basic usage as well as server failures
+ */
+class ConsumerBounceTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+
+  // configure the servers and clients
+  this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
+  this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
+  this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+  override def generateConfigs() = {
+    FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false)
+      .map(KafkaConfig.fromProps(_, serverConfig))
+  }
+
+  override def setUp() {
+    super.setUp()
+
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+  }
+
+  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
+
+  /*
+   * 1. Produce a bunch of messages
+   * 2. Then consume the messages while killing and restarting brokers at random
+   */
+  def consumeWithBrokerFailures(numIters: Int) {
+    val numRecords = 1000
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+
+    var consumed = 0
+    val consumer = this.consumers(0)
+    consumer.subscribe(topic)
+
+    val scheduler = new BounceBrokerScheduler(numIters)
+    scheduler.start()
+
+    while (scheduler.isRunning.get()) {
+      for (record <- consumer.poll(100)) {
+        assertEquals(consumed.toLong, record.offset())
+        consumed += 1
+      }
+      consumer.commit(CommitType.SYNC)
+
+      if (consumed == numRecords) {
+        consumer.seekToBeginning()
+        consumed = 0
+      }
+    }
+    scheduler.shutdown()
+  }
+
+  def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
+
+  def seekAndCommitWithBrokerFailures(numIters: Int) {
+    val numRecords = 1000
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+
+    val consumer = this.consumers(0)
+    consumer.subscribe(tp)
+    consumer.seek(tp, 0)
+
+    val scheduler = new BounceBrokerScheduler(numIters)
+    scheduler.start()
+
+    while(scheduler.isRunning.get()) {
+      val coin = TestUtils.random.nextInt(3)
+      if (coin == 0) {
+        info("Seeking to end of log")
+        consumer.seekToEnd()
+        assertEquals(numRecords.toLong, consumer.position(tp))
+      } else if (coin == 1) {
+        val pos = TestUtils.random.nextInt(numRecords).toLong
+        info("Seeking to " + pos)
+        consumer.seek(tp, pos)
+        assertEquals(pos, consumer.position(tp))
+      } else if (coin == 2) {
+        info("Committing offset.")
+        consumer.commit(CommitType.SYNC)
+        assertEquals(consumer.position(tp), consumer.committed(tp))
+      }
+    }
+  }
+
+  private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
+  {
+    var iter: Int = 0
+
+    override def doWork(): Unit = {
+      killRandomBroker()
+      Thread.sleep(500)
+      restartDeadBrokers()
+
+      iter += 1
+      if (iter == numIters)
+        initiateShutdown()
+      else
+        Thread.sleep(500)
+    }
+  }
+
+  private def sendRecords(numRecords: Int) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+    }
+    futures.map(_.get)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index c82bdaa..ffbdf5d 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -146,72 +146,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
   }
 
-  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
-
-  /*
-   * 1. Produce a bunch of messages
-   * 2. Then consume the messages while killing and restarting brokers at random
-   */
-  def consumeWithBrokerFailures(numIters: Int) {
-    val numRecords = 1000
-    sendRecords(numRecords)
-    this.producers.map(_.close)
-
-    var consumed = 0
-    val consumer = this.consumers(0)
-    consumer.subscribe(topic)
-
-    val scheduler = new BounceBrokerScheduler(numIters)
-    scheduler.start()
-
-    while (scheduler.isRunning.get()) {
-      for (record <- consumer.poll(100)) {
-        assertEquals(consumed.toLong, record.offset())
-        consumed += 1
-      }
-      consumer.commit(CommitType.SYNC)
-
-      if (consumed == numRecords) {
-        consumer.seekToBeginning()
-        consumed = 0
-      }
-    }
-    scheduler.shutdown()
-  }
-
-  def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
-
-  def seekAndCommitWithBrokerFailures(numIters: Int) {
-    val numRecords = 1000
-    sendRecords(numRecords)
-    this.producers.map(_.close)
-
-    val consumer = this.consumers(0)
-    consumer.subscribe(tp)
-    consumer.seek(tp, 0)
-
-    val scheduler = new BounceBrokerScheduler(numIters)
-    scheduler.start()
-
-    while(scheduler.isRunning.get()) {
-      val coin = TestUtils.random.nextInt(3)
-      if (coin == 0) {
-        info("Seeking to end of log")
-        consumer.seekToEnd()
-        assertEquals(numRecords.toLong, consumer.position(tp))
-      } else if (coin == 1) {
-        val pos = TestUtils.random.nextInt(numRecords).toLong
-        info("Seeking to " + pos)
-        consumer.seek(tp, pos)
-        assertEquals(pos, consumer.position(tp))
-      } else if (coin == 2) {
-        info("Committing offset.")
-        consumer.commit(CommitType.SYNC)
-        assertEquals(consumer.position(tp), consumer.committed(tp))
-      }
-    }
-  }
-  
   def testPartitionReassignmentCallback() {
     val callback = new TestConsumerReassignmentCallback()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
@@ -255,23 +189,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     } 
   }
 
-  private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
-  {
-    var iter: Int = 0
-
-    override def doWork(): Unit = {
-      killRandomBroker()
-      Thread.sleep(500)
-      restartDeadBrokers()
-
-      iter += 1
-      if (iter == numIters)
-        initiateShutdown()
-      else
-        Thread.sleep(500)
-    }
-  }
-
   private def sendRecords(numRecords: Int) {
     val futures = (0 until numRecords).map { i =>
       this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
new file mode 100644
index 0000000..1d31a43
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2015 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.io.IOException
+import java.net.ServerSocket
+import java.util.Properties
+
+import kafka.utils.TestUtils
+
+/**
+ * DO NOT USE THESE UTILITIES UNLESS YOU ABSOLUTELY MUST
+ *
+ * These are utilities for selecting fixed (preselected), ephemeral ports to use with tests. This is not a reliable way
+ * of testing on most machines because you can easily run into port conflicts. If you're using this class, you're almost
+ * certainly doing something wrong unless you can prove that your test **cannot function** properly without it.
+ */
+object FixedPortTestUtils {
+  def choosePorts(count: Int): Seq[Int] = {
+    try {
+      val sockets = (0 until count).map(i => new ServerSocket(0))
+      val ports = sockets.map(_.getLocalPort())
+      sockets.foreach(_.close())
+      ports
+    } catch {
+      case e: IOException => {
+        throw new RuntimeException(e)
+      }
+    }
+  }
+
+  def createBrokerConfigs(numConfigs: Int,
+    zkConnect: String,
+    enableControlledShutdown: Boolean = true,
+    enableDeleteTopic: Boolean = false): Seq[Properties] = {
+    val ports = FixedPortTestUtils.choosePorts(numConfigs)
+    (0 until numConfigs)
+      .map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5b7e366..02d2627 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -38,15 +38,16 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   lazy val producerConfig = new Properties
   lazy val consumerConfig = new Properties
   lazy val serverConfig = new Properties
-  override lazy val configs = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount)
+
+  var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+  override def generateConfigs() = {
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect)
     cfgs.map(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)
   }
-  
-  var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-  
+
   override def setUp() {
     super.setUp()
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
new file mode 100644
index 0000000..c9d16bb
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -0,0 +1,164 @@
+/**
+ * Copyright 2015 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{ShutdownableThread, TestUtils}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.junit.Assert._
+import org.junit.Test
+
+class ProducerBounceTest extends KafkaServerTestHarness {
+  private val producerBufferSize = 30000
+  private val serverMessageMaxBytes =  producerBufferSize/2
+
+  val numServers = 2
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
+  // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+  // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+
+  // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
+  // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
+  // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
+  // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
+  // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
+  // running server.
+  //
+  // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
+  // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
+  override def generateConfigs() = {
+    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false)
+      .map(KafkaConfig.fromProps(_, overridingProps))
+  }
+
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+
+  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
+
+  private val topic1 = "topic-1"
+  private val topic2 = "topic-2"
+
+  override def setUp() {
+    super.setUp()
+
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
+  }
+
+  override def tearDown() {
+    if (producer1 != null) producer1.close
+    if (producer2 != null) producer2.close
+    if (producer3 != null) producer3.close
+    if (producer4 != null) producer4.close
+
+    super.tearDown()
+  }
+
+  /**
+   * With replication, producer should able able to find new leader after it detects broker failure
+   */
+  @Test
+  def testBrokerFailure() {
+    val numPartitions = 3
+    val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
+    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
+
+    val scheduler = new ProducerScheduler()
+    scheduler.start
+
+    // rolling bounce brokers
+    for (i <- 0 until numServers) {
+      for (server <- servers) {
+        server.shutdown()
+        server.awaitShutdown()
+        server.startup()
+        Thread.sleep(2000)
+      }
+
+      // Make sure the producer do not see any exception
+      // in returned metadata due to broker failures
+      assertTrue(scheduler.failed == false)
+
+      // Make sure the leader still exists after bouncing brokers
+      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
+    }
+
+    scheduler.shutdown
+
+    // Make sure the producer do not see any exception
+    // when draining the left messages on shutdown
+    assertTrue(scheduler.failed == false)
+
+    // double check that the leader info has been propagated after consecutive bounces
+    val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
+    val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
+      // Consumers must be instantiated after all the restarts since they use random ports each time they start up
+      val consumer = new SimpleConsumer("localhost", servers(leader).boundPort(), 100, 1024 * 1024, "")
+      val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
+      consumer.close
+      response
+    }
+    val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
+    val uniqueMessages = messages.toSet
+    val uniqueMessageSize = uniqueMessages.size
+
+    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
+  }
+
+  private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
+  {
+    val numRecords = 1000
+    var sent = 0
+    var failed = false
+
+    val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
+
+    override def doWork(): Unit = {
+      val responses =
+        for (i <- sent+1 to sent+numRecords)
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
+                            new ErrorLoggingCallback(topic1, null, null, true))
+      val futures = responses.toList
+
+      try {
+        futures.map(_.get)
+        sent += numRecords
+      } catch {
+        case e : Exception => failed = true
+      }
+    }
+
+    override def shutdown(){
+      super.shutdown()
+      producer.close
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index cae72f4..3a7ae8b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -34,24 +34,22 @@ import kafka.message.Message
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Utils, TestUtils}
 
-import scala.Array
-
 
 @RunWith(value = classOf[Parameterized])
 class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId = 0
-  private val port = TestUtils.choosePort
   private var server: KafkaServer = null
 
-  private val props = TestUtils.createBrokerConfig(brokerId, port)
-  private val config = KafkaConfig.fromProps(props)
-
   private val topic = "topic"
   private val numRecords = 2000
 
   @Before
   override def setUp() {
     super.setUp()
+
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+    val config = KafkaConfig.fromProps(props)
+
     server = TestUtils.createServer(config)
   }
 
@@ -71,14 +69,14 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
   def testCompression() {
 
     val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server)))
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
     props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
-    val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
+    val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 1024*1024, "")
 
     try {
       // create topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7eb6d05..ee94011 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -20,7 +20,6 @@ package kafka.api
 import org.junit.Test
 import org.junit.Assert._
 
-import java.lang.Integer
 import java.util.{Properties, Random}
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
@@ -28,7 +27,7 @@ import kafka.common.Topic
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
+import kafka.utils.{ShutdownableThread, TestUtils}
 
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
@@ -42,16 +41,14 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   val numServers = 2
 
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
   overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
   overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
   // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
   // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
 
-  val configs =
-    for (props <- TestUtils.createBrokerConfigs(numServers, false))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
@@ -67,19 +64,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
-
     producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
     producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
     producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
   }
 
   override def tearDown() {
-    consumer1.close
-    consumer2.close
-
     if (producer1 != null) producer1.close
     if (producer2 != null) producer2.close
     if (producer3 != null) producer3.close
@@ -94,7 +84,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckZero() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     // send a too-large record
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -107,7 +97,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckOne() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     // send a too-large record
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -141,7 +131,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testWrongBrokerList() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     // producer with incorrect broker list
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
@@ -161,7 +151,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testNoResponse() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     // first send a message to make sure the metadata is refreshed
     val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
@@ -202,7 +192,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testInvalidPartition() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     // create a record with incorrect partition id, send should fail
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes)
@@ -223,7 +213,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testSendAfterClosed() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
 
@@ -248,59 +238,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     // re-close producer is fine
   }
 
-  /**
-   * With replication, producer should able able to find new leader after it detects broker failure
-   */
-  @Test
-  def testBrokerFailure() {
-    // create topic
-    val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
-    val partition = 0
-    assertTrue("Leader of partition 0 of the topic should exist", leaders(partition).isDefined)
-
-    val scheduler = new ProducerScheduler()
-    scheduler.start
-
-    // rolling bounce brokers
-    for (i <- 0 until 2) {
-      for (server <- servers) {
-        server.shutdown()
-        server.awaitShutdown()
-        server.startup()
-
-        Thread.sleep(2000)
-      }
-
-      // Make sure the producer do not see any exception
-      // in returned metadata due to broker failures
-      assertTrue(scheduler.failed == false)
-
-      // Make sure the leader still exists after bouncing brokers
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition)
-    }
-
-    scheduler.shutdown
-
-    // Make sure the producer do not see any exception
-    // when draining the left messages on shutdown
-    assertTrue(scheduler.failed == false)
-
-    // double check that the leader info has been propagated after consecutive bounces
-    val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition)
-
-    val fetchResponse = if(leader == configs(0).brokerId) {
-      consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
-    } else {
-      consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
-    }
-
-    val messages = fetchResponse.iterator.toList.map(_.message)
-    val uniqueMessages = messages.toSet
-    val uniqueMessageSize = uniqueMessages.size
-
-    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
-  }
-
   @Test
   def testCannotSendToInternalTopic() {
     val thrown = intercept[ExecutionException] {
@@ -313,9 +250,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   def testNotEnoughReplicas() {
     val topicName = "minisrtest"
     val topicProps = new Properties()
-    topicProps.put("min.insync.replicas","3")
+    topicProps.put("min.insync.replicas",(numServers+1).toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     try {
@@ -333,9 +270,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   def testNotEnoughReplicasAfterBrokerShutdown() {
     val topicName = "minisrtest2"
     val topicProps = new Properties()
-    topicProps.put("min.insync.replicas","2")
+    topicProps.put("min.insync.replicas",numServers.toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     // this should work with all brokers up and running

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 3df4507..aba256d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -25,7 +25,7 @@ import org.junit.Test
 import org.junit.Assert._
 
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.integration.KafkaServerTestHarness
@@ -39,12 +39,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
   val numServers = 2
 
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
   overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
 
-  val configs =
-    for (props <- TestUtils.createBrokerConfigs(numServers, false))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
@@ -56,8 +54,8 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
     super.setUp()
 
     // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
-    consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
+    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
   }
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 8bc1785..99ac923 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -27,21 +27,7 @@ import kafka.client.ClientUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 
 class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val brokerId4 = 3
-
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-  val port3 = TestUtils.choosePort()
-  val port4 = TestUtils.choosePort()
-
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false)
-
+  var configs: Seq[KafkaConfig] = null
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   var brokers: Seq[Broker] = Seq.empty[Broker]
 
@@ -54,14 +40,11 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def setUp() {
     super.setUp()
-    // start all the servers
-    val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
-    val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
-    val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
-    val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
 
-    servers ++= List(server1, server2, server3, server4)
-    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+    configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false)))
+    // start all the servers
+    servers = configs.map(c => TestUtils.createServer(c))
+    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort))
 
     // create topics first
     createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index ee0b21e..0305f70 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testReassigningNonExistingPartition() {
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val topic = "test"
     val partition = 1
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@@ -365,7 +365,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testTopicConfigChange() {
     val partitions = 3
     val topic = "my-topic"
-    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
     def makeConfig(messageSize: Int, retentionMs: Long) = {
       var props = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 1baff0e..1913ad6 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils}
+import kafka.utils._
 import kafka.server.KafkaConfig
 import org.junit.Test
 import kafka.consumer._
@@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness
 
 
 class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
-  val configs = TestUtils.createBrokerConfigs(3, false, true).map(KafkaConfig.fromProps)
+  def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
 
   @Test
   def testGroupWideDeleteInZK() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 6258983..61cc602 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -96,7 +96,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
@@ -224,7 +224,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition(topicName, 0)
     val topic = topicAndPartition.topic
 
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
     brokerConfigs(0).setProperty("delete.topic.enable", "true")
     brokerConfigs(0).setProperty("log.cleaner.enable","true")
     brokerConfigs(0).setProperty("log.cleanup.policy","compact")
@@ -253,7 +253,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
 
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")
     )
     createTestTopicAndCluster(topic,brokerConfigs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 995397b..bb25467 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -30,7 +30,6 @@ import kafka.utils.TestUtils._
 import kafka.utils._
 import org.junit.Test
 import kafka.serializer._
-import kafka.cluster.{Broker, Cluster}
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
 
@@ -38,31 +37,27 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val numNodes = 1
 
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
-
-  val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
 
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
   val group = "group1"
   val consumer0 = "consumer0"
   val consumedOffset = 5
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
-  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           0,
-                                                           queue,
-                                                           new AtomicLong(consumedOffset),
-                                                           new AtomicLong(0),
-                                                           new AtomicInteger(0),
-                                                           ""))
-  val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+  var topicInfos: Seq[PartitionTopicInfo] = null
+
+  def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
 
   override def setUp() {
-    super.setUp
+    super.setUp()
+    topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+      0,
+      queue,
+      new AtomicLong(consumedOffset),
+      new AtomicLong(0),
+      new AtomicInteger(0),
+      ""))
     createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 155fd04..f3e76db 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -38,17 +38,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   val RebalanceBackoffMs = 5000
   var dirs : ZKGroupTopicDirs = null
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  val configs =
-    for (props <- TestUtils.createBrokerConfigs(numNodes))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
+    .map(KafkaConfig.fromProps(_, overridingProps))
 
   val group = "group1"
   val consumer0 = "consumer0"
@@ -93,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages)
+    val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages)
 
     // wait to make sure the topic and partition have a leader for the successful case
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -127,8 +124,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages)
+    val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -148,8 +145,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages)
+    val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -182,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -215,8 +212,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -236,8 +233,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
-                        sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -258,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++
-                       sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec)
+    val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++
+                       sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -284,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++
-                       sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec)
+    val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++
+                       sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -319,13 +316,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
   def testLeaderSelectionForPartition() {
-    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
+    val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer)
 
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
+    val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -351,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testConsumerRebalanceListener() {
     // Send messages to create topic
-    sendMessagesToPartition(configs, topic, 0, nMessages)
-    sendMessagesToPartition(configs, topic, 1, nMessages)
+    sendMessagesToPartition(servers, topic, 0, nMessages)
+    sendMessagesToPartition(servers, topic, 1, nMessages)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index ffa6c30..139dc9a 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -31,7 +31,7 @@ import junit.framework.Assert._
 
 class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
-  val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
+  def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   val topic = "test_topic"
   val group = "default_group"
@@ -78,7 +78,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
     TestUtils.createTopic(zkClient, topic, 1, 1, servers)
 
     val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
-      TestUtils.getBrokerListStrFromConfigs(configs),
+      TestUtils.getBrokerListStrFromServers(servers),
       keyEncoder = classOf[StringEncoder].getName)
 
     for(i <- 0 until numMessages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 3093e45..ecb5a33 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -32,23 +32,13 @@ import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
-
   val numNodes = 1
-  val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield KafkaConfig.fromProps(props)
+  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
+
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
-  val shutdown = ZookeeperConsumerConnector.shutdownCommand
+
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
-  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           0,
-                                                           queue,
-                                                           new AtomicLong(0),
-                                                           new AtomicLong(0),
-                                                           new AtomicInteger(0),
-                                                           ""))
 
   var fetcher: ConsumerFetcherManager = null
 
@@ -56,8 +46,18 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     super.setUp
     createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
 
+    val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())))
+
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopConnections()
+    val topicInfos = configs.map(c =>
+      new PartitionTopicInfo(topic,
+        0,
+        queue,
+        new AtomicLong(0),
+        new AtomicLong(0),
+        new AtomicInteger(0),
+        ""))
     fetcher.startConnections(topicInfos, cluster)
   }
 
@@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     var count = 0
     for(conf <- configs) {
       val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
-        TestUtils.getBrokerListStrFromConfigs(configs),
+        TestUtils.getBrokerListStrFromServers(servers),
         keyEncoder = classOf[StringEncoder].getName)
       val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 062790f..28e3122 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -24,28 +24,38 @@ import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.KafkaException
-import kafka.utils.TestUtils
 
 /**
  * A test harness that brings up some number of broker nodes
  */
 trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
-
-  val configs: List[KafkaConfig]
+  var instanceConfigs: Seq[KafkaConfig] = null
   var servers: Buffer[KafkaServer] = null
   var brokerList: String = null
   var alive: Array[Boolean] = null
-  
+
+  /**
+   * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
+   * test and should not reuse previous configurations unless they select their ports randomly when servers are started.
+   */
+  def generateConfigs(): Seq[KafkaConfig]
+
+  def configs: Seq[KafkaConfig] = {
+    if (instanceConfigs == null)
+      instanceConfigs = generateConfigs()
+    instanceConfigs
+  }
+
   def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
   
-  def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",")
+  def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
   
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
-      throw new KafkaException("Must suply at least one server config.")
-    brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
+      throw new KafkaException("Must supply at least one server config.")
     servers = configs.map(TestUtils.createServer(_)).toBuffer
+    brokerList = TestUtils.getBrokerListStrFromServers(servers)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 30deaf4..f601d31 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -23,16 +23,13 @@ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import kafka.producer.{KeyedMessage, Producer}
 import org.apache.log4j.{Level, Logger}
-import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.AdminUtils
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 import kafka.utils.{StaticPartitioner, TestUtils, Utils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
-import TestUtils._
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -40,10 +37,7 @@ import TestUtils._
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  val port = TestUtils.choosePort()
-  val props = TestUtils.createBrokerConfig(0, port)
-  val config = KafkaConfig.fromProps(props)
-  val configs = List(config)
+  def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
@@ -97,7 +91,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     props.put("compression.codec", "gzip")
 
     val stringProducer1 = TestUtils.createProducer[String, String](
-      TestUtils.getBrokerListStrFromConfigs(configs),
+      TestUtils.getBrokerListStrFromServers(servers),
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
@@ -222,7 +216,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] =
       TestUtils.createProducer[String, String](
-        TestUtils.getBrokerListStrFromConfigs(configs),
+        TestUtils.getBrokerListStrFromServers(servers),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[StringEncoder].getName,
         partitioner = classOf[StaticPartitioner].getName,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 108c2e7..4614a92 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -24,18 +24,17 @@ import kafka.utils.{StaticPartitioner, TestUtils}
 import kafka.serializer.StringEncoder
 
 trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
-    val port: Int
     val host = "localhost"
     var producer: Producer[String, String] = null
     var consumer: SimpleConsumer = null
 
   override def setUp() {
       super.setUp
-      producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
+      producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[StringEncoder].getName,
         partitioner = classOf[StaticPartitioner].getName)
-      consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
+      consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "")
     }
 
    override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index f74e716..130b205 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -25,35 +25,18 @@ import kafka.utils.{Utils, TestUtils}
 import kafka.server.{KafkaConfig, KafkaServer}
 
 class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val brokerId4 = 3
-
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-  val port3 = TestUtils.choosePort()
-  val port4 = TestUtils.choosePort()
-
-  // controlled.shutdown.enable is true by default
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
-
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   val partitionId = 0
+  var servers: Seq[KafkaServer] = null
 
   override def setUp() {
     super.setUp()
+    // controlled.shutdown.enable is true by default
+    val configs = (0 until 4).map(i => TestUtils.createBrokerConfig(i, zkConnect))
+    configs(3).put("controlled.shutdown.retry.backoff.ms", "100")
+ 
     // start all the servers
-    val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
-    val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
-    val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
-    val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
-
-    servers ++= List(server1, server2, server3, server4)
+    servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
   }
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index a671af4..56b1b8c 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -31,14 +31,15 @@ import kafka.common.ErrorMapping
 import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val props = createBrokerConfigs(1)
-  val configs = props.map(p => KafkaConfig.fromProps(p))
   private var server1: KafkaServer = null
-  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
+  var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
+    val props = createBrokerConfigs(1, zkConnect)
+    val configs = props.map(KafkaConfig.fromProps)
     server1 = TestUtils.createServer(configs.head)
+    brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()))
   }
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 8342cae..7c87b81 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -29,7 +29,7 @@ import kafka.admin.AdminUtils
 import kafka.common.FailedToSendMessageException
 import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
 import kafka.producer.{KeyedMessage, Producer}
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.Utils
 import kafka.utils.TestUtils._
@@ -39,20 +39,12 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
-  val port1 = choosePort()
-  val port2 = choosePort()
-
   // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to
   // reduce test execution time
   val enableControlledShutdown = true
-  val configProps1 = createBrokerConfig(brokerId1, port1)
-  val configProps2 = createBrokerConfig(brokerId2, port2)
 
-  for (configProps <- List(configProps1, configProps2)) {
-    configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
-    configProps.put("controlled.shutdown.max.retries", String.valueOf(1))
-    configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000))
-  }
+  var configProps1: Properties = null
+  var configProps2: Properties = null
 
   var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@@ -69,6 +61,15 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
 
+    configProps1 = createBrokerConfig(brokerId1, zkConnect)
+    configProps2 = createBrokerConfig(brokerId2, zkConnect)
+
+    for (configProps <- List(configProps1, configProps2)) {
+      configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
+      configProps.put("controlled.shutdown.max.retries", String.valueOf(1))
+      configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000))
+    }
+
     // temporarily set loggers to a higher level so that tests run quietly
     kafkaApisLogger.setLevel(Level.FATAL)
     networkProcessorLogger.setLevel(Level.FATAL)
@@ -254,7 +255,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   private def produceMessage(topic: String, message: String) = {
     val producer: Producer[String, Array[Byte]] = createProducer(
-      getBrokerListStrFromConfigs(configs),
+      getBrokerListStrFromServers(servers),
       keyEncoder = classOf[StringEncoder].getName)
     producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
     producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 3d0fc9d..ad66bb2 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -39,19 +39,14 @@ import junit.framework.Assert._
 
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
-
-  val zookeeperConnect = zkConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
 
   val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
 
-  val configs =
-    for (props <- TestUtils.createBrokerConfigs(numNodes))
-    yield KafkaConfig.fromProps(props, overridingProps)
+  def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
 
   val group = "group1"
   val consumer1 = "consumer1"
@@ -68,7 +63,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
     // create a consumer
-    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
 
@@ -93,7 +88,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                    compressed: CompressionCodec): List[String] = {
     var messages: List[String] = Nil
     val producer: kafka.producer.Producer[Int, String] =
-      TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+      TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[IntEncoder].getName)
     val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6adaffd8/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3c0599c..ac4c5b9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -37,7 +37,7 @@ class LogTest extends JUnitSuite {
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
     config = KafkaConfig.fromProps(props)
   }
 


Mime
View raw message