kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5713; Shutdown brokers in tests
Date Fri, 12 May 2017 01:29:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b6effcbba -> 12aa70d55


KAFKA-5713; Shutdown brokers in tests

Add broker shutdown for `LeaderEpochIntegrationTest`.
Move broker shutdown in other tests to `tearDown` to
ensure brokers are shutdown even if tests fail.
Also added assertion to `ZooKeeperTestHarness` to
verify that controller event thread is not running
since this thread may load JAAS configuration if ZK
ports are reused.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3026 from rajinisivaram/KAFKA-5173


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12aa70d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12aa70d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12aa70d5

Branch: refs/heads/trunk
Commit: 12aa70d55bc422226255ab18e69e4bc6f24be2d9
Parents: b6effcb
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Fri May 12 00:43:35 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri May 12 02:29:16 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/ProducerCompressionTest.scala     |   5 +-
 ...tenersWithSameSecurityProtocolBaseTest.scala |   7 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   2 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   5 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |   5 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 209 +++++++++----------
 .../unit/kafka/admin/DeleteTopicTest.scala      |  43 ++--
 .../admin/ReassignPartitionsClusterTest.scala   |   5 +-
 .../controller/ControllerIntegrationTest.scala  |   5 +-
 .../integration/KafkaServerTestHarness.scala    |   3 +-
 .../kafka/integration/TopicMetadataTest.scala   |  20 +-
 .../unit/kafka/producer/ProducerTest.scala      |   5 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |   7 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   5 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   3 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   6 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   3 +-
 .../unit/kafka/server/ReplicaFetchTest.scala    |   2 +-
 .../kafka/server/ReplicationQuotasTest.scala    |   9 +-
 .../server/ServerGenerateBrokerIdTest.scala     |  78 +++----
 .../server/ServerGenerateClusterIdTest.scala    |  44 ++--
 .../unit/kafka/server/ServerStartupTest.scala   |  39 ++--
 ...rivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../epoch/LeaderEpochIntegrationTest.scala      |  15 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  17 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  20 ++
 26 files changed, 265 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b165918..2001095 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -30,7 +30,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 
 
 @RunWith(value = classOf[Parameterized])
@@ -53,8 +53,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
 
   @After
   override def tearDown() {
-    server.shutdown
-    CoreUtils.delete(server.config.logDirs)
+    TestUtils.shutdownServers(Seq(server))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 88b314f..7db9d7c 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -26,7 +26,7 @@ import kafka.api.SaslSetup
 import kafka.common.Topic
 import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -133,10 +133,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
   override def tearDown() {
     producers.values.foreach(_.close())
     consumers.values.foreach(_.close())
-    servers.foreach { s =>
-      s.shutdown()
-      CoreUtils.delete(s.config.logDirs)
-    }
+    TestUtils.shutdownServers(servers)
     super.tearDown()
     closeSasl()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index ae76eb6..9a04e67 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -45,7 +45,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     Exit.resetExitProcedure()
-    brokers.foreach(_.shutdown())
+    TestUtils.shutdownServers(brokers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 38d07ba..d8bc65e 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Exit, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.jfree.chart.plot.PlotOrientation
@@ -108,8 +108,7 @@ object ReplicationQuotasTestRig {
     }
 
     override def tearDown() {
-      servers.par.foreach(_.shutdown())
-      servers.par.foreach(server => CoreUtils.delete(server.config.logDirs))
+      TestUtils.shutdownServers(servers)
       super.tearDown()
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index d95d90d..e9c5ac5 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -59,8 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown() {
-    servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index ec54608..f8c65eb 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali
 import org.apache.kafka.common.metrics.Quota
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
 import java.util.Properties
 
 import kafka.utils._
@@ -47,6 +47,14 @@ import scala.util.Try
 
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
+  var servers: Seq[KafkaServer] = Seq()
+
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
   def testReplicaAssignment() {
     val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
@@ -188,7 +196,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -211,7 +219,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -219,7 +226,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -241,8 +248,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
-
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -250,7 +255,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -272,14 +277,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testReassigningNonExistingPartition() {
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -288,7 +292,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -305,7 +308,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions()
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils),
@@ -317,7 +320,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -344,7 +346,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
-    val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get
     // trigger preferred replica election
@@ -352,7 +354,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     preferredReplicaElection.moveLeaderToPreferredReplica()
     val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get
     assertEquals("Preferred replica election failed", preferredReplica, newLeader)
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -362,7 +363,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val partition = 1
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
-    val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
 
@@ -373,36 +374,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     controller.shutdownBroker(2, controlledShutdownCallback)
     var partitionsRemaining = resultQueue.take().get
     var activeServers = servers.filter(s => s.config.brokerId != 2)
-    try {
-      // wait for the update metadata request to trickle to the brokers
-      TestUtils.waitUntilTrue(() =>
-        activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
-        "Topic test not created after timeout")
-      assertEquals(0, partitionsRemaining.size)
-      var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-      var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-      assertEquals(0, leaderAfterShutdown)
-      assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
-      assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
-
-      controller.shutdownBroker(1, controlledShutdownCallback)
-      partitionsRemaining = resultQueue.take().get
-      assertEquals(0, partitionsRemaining.size)
-      activeServers = servers.filter(s => s.config.brokerId == 0)
-      partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-      leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-      assertEquals(0, leaderAfterShutdown)
-
-      assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
-      controller.shutdownBroker(0, controlledShutdownCallback)
-      partitionsRemaining = resultQueue.take().get
-      assertEquals(1, partitionsRemaining.size)
-      // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
-    }
-    finally {
-      servers.foreach(_.shutdown())
-    }
+    // wait for the update metadata request to trickle to the brokers
+    TestUtils.waitUntilTrue(() =>
+      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+      "Topic test not created after timeout")
+    assertEquals(0, partitionsRemaining.size)
+    var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+    assertEquals(0, leaderAfterShutdown)
+    assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+    assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
+
+    controller.shutdownBroker(1, controlledShutdownCallback)
+    partitionsRemaining = resultQueue.take().get
+    assertEquals(0, partitionsRemaining.size)
+    activeServers = servers.filter(s => s.config.brokerId == 0)
+    partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+    assertEquals(0, leaderAfterShutdown)
+
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+    controller.shutdownBroker(0, controlledShutdownCallback)
+    partitionsRemaining = resultQueue.take().get
+    assertEquals(1, partitionsRemaining.size)
+    // leader doesn't change since all the replicas are shut down
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
   }
 
   /**
@@ -414,6 +410,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val partitions = 3
     val topic = "my-topic"
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+    servers = Seq(server)
 
     def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = {
       val props = new Properties()
@@ -446,51 +443,45 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
       }
     }
 
-    try {
-      // create a topic with a few config overrides and check that they are applied
-      val maxMessageSize = 1024
-      val retentionMs = 1000 * 1000
-      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-
-      //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
-      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
+    // create a topic with a few config overrides and check that they are applied
+    val maxMessageSize = 1024
+    val retentionMs = 1000 * 1000
+    AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
 
-      //Update dynamically and all properties should be applied
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
 
-      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
+    //Update dynamically and all properties should be applied
+    AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
 
-      // now double the config values for the topic and check that it is applied
-      val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
-      checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
 
-      // Verify that the same config can be read from ZK
-      val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
-      assertEquals(newConfig, configInZk)
+    // now double the config values for the topic and check that it is applied
+    val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
+    AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
+    checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
 
-      //Now delete the config
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
-      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+    // Verify that the same config can be read from ZK
+    val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
+    assertEquals(newConfig, configInZk)
 
-      //Add config back
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
-      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
+    //Now delete the config
+    AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
 
-      //Now ensure updating to "" removes the throttled replica list also
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
-      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  quotaManagerIsThrottled = false)
+    //Add config back
+    AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
 
-    } finally {
-      server.shutdown()
-      CoreUtils.delete(server.config.logDirs)
-    }
+    //Now ensure updating to "" removes the throttled replica list also
+    AdminUtils.changeTopicConfig(server.zkUtils, topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  quotaManagerIsThrottled = false)
   }
 
   @Test
   def shouldPropagateDynamicBrokerConfigs() {
     val brokerIds = Seq(0, 1, 2)
-    val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
+    servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
 
     def checkConfig(limit: Long) {
       retry(10000) {
@@ -501,37 +492,31 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
       }
     }
 
-    try {
-      val limit: Long = 1000000
-
-      // Set the limit & check it is applied to the log
-      changeBrokerConfig(zkUtils, brokerIds, propsWith(
-        (LeaderReplicationThrottledRateProp, limit.toString),
-        (FollowerReplicationThrottledRateProp, limit.toString)))
-      checkConfig(limit)
-
-      // Now double the config values for the topic and check that it is applied
-      val newLimit = 2 * limit
-      changeBrokerConfig(zkUtils, brokerIds,  propsWith(
-        (LeaderReplicationThrottledRateProp, newLimit.toString),
-        (FollowerReplicationThrottledRateProp, newLimit.toString)))
-      checkConfig(newLimit)
-
-      // Verify that the same config can be read from ZK
-      for (brokerId <- brokerIds) {
-        val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString)
-        assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
-        assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
-      }
-
-      //Now delete the config
-      changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
-      checkConfig(DefaultReplicationThrottledRate)
-
-    } finally {
-      servers.foreach(_.shutdown())
-      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    val limit: Long = 1000000
+
+    // Set the limit & check it is applied to the log
+    changeBrokerConfig(zkUtils, brokerIds, propsWith(
+      (LeaderReplicationThrottledRateProp, limit.toString),
+      (FollowerReplicationThrottledRateProp, limit.toString)))
+    checkConfig(limit)
+
+    // Now double the config values for the topic and check that it is applied
+    val newLimit = 2 * limit
+    changeBrokerConfig(zkUtils, brokerIds,  propsWith(
+      (LeaderReplicationThrottledRateProp, newLimit.toString),
+      (FollowerReplicationThrottledRateProp, newLimit.toString)))
+    checkConfig(newLimit)
+
+    // Verify that the same config can be read from ZK
+    for (brokerId <- brokerIds) {
+      val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString)
+      assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
+      assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
     }
+
+    //Now delete the config
+    changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+    checkConfig(DefaultReplicationThrottledRate)
   }
 
   /**
@@ -556,13 +541,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-    try {
-      assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-      assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
-    } finally {
-      server.shutdown()
-      CoreUtils.delete(server.config.logDirs)
-    }
+    servers = Seq(server)
+    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 2085d2d..d9ab85e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import kafka.utils.ZkUtils._
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
 import java.util.Properties
 
 import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
@@ -31,22 +31,29 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
 class DeleteTopicTest extends ZooKeeperTestHarness {
 
+  var servers: Seq[KafkaServer] = Seq()
+
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
   def testDeleteTopicWithAllAliveReplicas() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testResumeDeleteTopicWithRecoveredFollower() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     // shut down one follower replica
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
@@ -64,14 +71,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // restart follower replica
     follower.startup()
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testResumeDeleteTopicOnControllerFailover() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
@@ -91,7 +97,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     follower.startup()
 
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -103,6 +108,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
+    this.servers = allServers
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
@@ -136,13 +142,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
     follower.startup()
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    allServers.foreach(_.shutdown())
   }
 
   @Test
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
@@ -159,13 +164,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() =>
       servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
       "Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
@@ -175,7 +179,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -183,7 +186,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
     val topicPartition = new TopicPartition(topic, 0)
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -195,14 +198,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // check if all replica logs are created
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testDeleteNonExistingTopic() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
     // start topic deletion
     try {
       AdminUtils.deleteTopic(zkUtils, "test2")
@@ -220,7 +222,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // topic test should have a leader
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
-    servers.foreach(_.shutdown())
   }
 
   @Test
@@ -236,7 +237,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.head.setProperty("log.segment.bytes","100")
     brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
-    val servers = createTestTopicAndCluster(topic,brokerConfigs)
+    servers = createTestTopicAndCluster(topic,brokerConfigs)
 
     // for simplicity, we are validating cleaner offsets on a single broker
     val server = servers.head
@@ -251,15 +252,13 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // delete topic
     AdminUtils.deleteTopic(zkUtils, "test")
     TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
-
-    servers.foreach(_.shutdown())
   }
 
   @Test
   def testDeleteTopicAlreadyMarkedAsDeleted() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic)
+    servers = createTestTopicAndCluster(topic)
 
     try {
       // start topic deletion
@@ -273,7 +272,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     }
 
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    servers.foreach(_.shutdown())
   }
 
   private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = {
@@ -311,7 +309,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testDisableDeleteTopic() {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
+    servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
     // mark the topic for deletion
     AdminUtils.deleteTopic(zkUtils, "test")
     TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
@@ -323,6 +321,5 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // topic test should have a leader
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
-    servers.foreach(_.shutdown())
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 05a3f83..e3b0aa8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -17,7 +17,7 @@ import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
@@ -44,8 +44,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
   @After
   override def tearDown() {
-    servers.par.foreach(_.shutdown())
-    servers.par.foreach(server => CoreUtils.delete(server.config.logDirs))
+    TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 5e608d1..cbb98e8 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -20,7 +20,7 @@ package kafka.controller
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.{After, Before, Test}
 
@@ -35,8 +35,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown() {
-    servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    TestUtils.shutdownServers(servers)
     super.tearDown
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index af3133a..da25d5c 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -97,8 +97,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     if (servers != null) {
-      servers.foreach(_.shutdown())
-      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+      TestUtils.shutdownServers(servers)
     }
     super.tearDown
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index d63d5b2..07af590 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -24,7 +24,7 @@ import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
@@ -33,6 +33,7 @@ import org.junit.{Test, After, Before}
 
 class TopicMetadataTest extends ZooKeeperTestHarness {
   private var server1: KafkaServer = null
+  private var adHocServers: Seq[KafkaServer] = Seq()
   var brokerEndPoints: Seq[BrokerEndPoint] = null
   var adHocConfigs: Seq[KafkaConfig] = null
   val numConfigs: Int = 4
@@ -53,7 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown() {
-    server1.shutdown()
+    TestUtils.shutdownServers(adHocServers :+ server1)
     super.tearDown()
   }
 
@@ -134,6 +135,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
     adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
     // start adHoc brokers with replication factor too high
     val adHocServer = createServer(new KafkaConfig(adHocProps))
+    adHocServers = Seq(adHocServer)
     // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
     // `securityProtocol` instead of PLAINTEXT below
     val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
@@ -147,8 +149,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
     assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    adHocServer.shutdown()
   }
 
   @Test
@@ -216,7 +216,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
     val numBrokers = 2 //just 2 brokers are enough for the test
 
     // start adHoc brokers
-    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
+    adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
     val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
 
     // create topic
@@ -232,9 +232,6 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
     // check metadata is still correct and updated at all brokers
     checkIsr(allServers)
-
-    // shutdown adHoc brokers
-    adHocServers.map(p => p.shutdown())
   }
 
   private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
@@ -269,7 +266,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
   @Test
   def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
-    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
+    adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
 
     checkMetadata(adHocServers, numConfigs - 1)
 
@@ -277,13 +274,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
     adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
 
     checkMetadata(adHocServers, numConfigs)
-    adHocServers.map(p => p.shutdown())
   }
 
 
   @Test
   def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
-    val adHocServers = adHocConfigs.map(p => createServer(p))
+    adHocServers = adHocConfigs.map(p => createServer(p))
 
     checkMetadata(adHocServers, numConfigs)
 
@@ -292,7 +288,5 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
     adHocServers.last.awaitShutdown()
 
     checkMetadata(adHocServers, numConfigs - 1)
-
-    adHocServers.map(p => p.shutdown())
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 63ec83e..1d3f77f 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -91,10 +91,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     if (consumer2 != null)
       consumer2.close()
 
-    server1.shutdown
-    server2.shutdown
-    CoreUtils.delete(server1.config.logDirs)
-    CoreUtils.delete(server2.config.logDirs)
+    TestUtils.shutdownServers(Seq(server1, server2))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 8d4899b..6d9ab72 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import org.junit.Assert._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Test}
@@ -32,10 +32,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown() {
-    servers.foreach { s =>
-      s.shutdown()
-      CoreUtils.delete(s.config.logDirs)
-    }
+    TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 6ffe314..aa243be 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 import kafka.api.LeaderAndIsr
 import org.apache.kafka.common.requests._
 import org.junit.Assert._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.cluster.Broker
 import kafka.controller.{ControllerChannelManager, ControllerContext}
 import kafka.utils.TestUtils._
@@ -60,8 +60,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
   @After
   override def tearDown() {
-    servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 415027c..9383355 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -60,8 +60,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     simpleConsumer.close
-    server.shutdown
-    Utils.delete(logDir)
+    TestUtils.shutdownServers(Seq(server))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 54cee6b..0ecc3c7 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -27,7 +27,6 @@ import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
-import org.apache.kafka.common.utils.Utils
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -95,10 +94,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     producer.close()
-    for (server <- servers) {
-      server.shutdown()
-      Utils.delete(new File(server.config.logDirs.head))
-    }
+    TestUtils.shutdownServers(servers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index ebfbe89..244ef78 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -68,8 +68,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     simpleConsumer.close
-    server.shutdown
-    Utils.delete(logDir)
+    TestUtils.shutdownServers(Seq(server))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 19c386f..dd683e1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -42,7 +42,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness  {
 
   @After
   override def tearDown() {
-    brokers.foreach(_.shutdown())
+    TestUtils.shutdownServers(brokers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 15e77a0..5fc4c0f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -50,15 +50,10 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
   val topic = "topic1"
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
 
-  @Before
-  override def setUp() {
-    super.setUp()
-  }
-
   @After
   override def tearDown() {
-    brokers.par.foreach(_.shutdown())
     producer.close()
+    shutdownServers(brokers)
     super.tearDown()
   }
 
@@ -242,4 +237,4 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val metricName = broker.metrics.metricName("byte-rate", repType.toString)
     broker.metrics.metrics.asScala(metricName).value
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index f21f2de..0ba133f 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -19,8 +19,8 @@ package kafka.server
 import java.util.Properties
 
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, CoreUtils}
-import org.junit.{Before, Test}
+import kafka.utils.TestUtils
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import java.io.File
 
@@ -30,6 +30,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   var props2: Properties = null
   var config2: KafkaConfig = null
   val brokerMetaPropsFile = "meta.properties"
+  var servers: Seq[KafkaServer] = Seq()
 
   @Before
   override def setUp() {
@@ -40,6 +41,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     config2 = KafkaConfig.fromProps(props2)
   }
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
   @Test
   def testAutoGenerateBrokerId() {
     var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
@@ -47,11 +54,10 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
     // restart the server check to see if it uses the brokerId generated previously
-    server1 = new KafkaServer(config1)
-    server1.startup()
+    server1 = TestUtils.createServer(config1)
+    servers = Seq(server1)
     assertEquals(server1.config.brokerId, 1001)
     server1.shutdown()
-    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -61,23 +67,18 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     val server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     val server2 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName))
     val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
-    val config3 = KafkaConfig.fromProps(props3)
-    val server3 = new KafkaServer(config3)
+    val server3 = new KafkaServer(KafkaConfig.fromProps(props3))
     server1.startup()
-    assertEquals(server1.config.brokerId,1001)
+    assertEquals(server1.config.brokerId, 1001)
     server2.startup()
-    assertEquals(server2.config.brokerId,0)
+    assertEquals(server2.config.brokerId, 0)
     server3.startup()
-    assertEquals(server3.config.brokerId,1002)
-    server1.shutdown()
-    server2.shutdown()
-    server3.shutdown()
-    assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
-    assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
-    assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
-    CoreUtils.delete(server1.config.logDirs)
-    CoreUtils.delete(server2.config.logDirs)
-    CoreUtils.delete(server3.config.logDirs)
+    assertEquals(server3.config.brokerId, 1002)
+    servers = Seq(server1, server2, server3)
+    servers.foreach(_.shutdown())
+    assertTrue(verifyBrokerMetadata(server1.config.logDirs, 1001))
+    assertTrue(verifyBrokerMetadata(server2.config.logDirs, 0))
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs, 1002))
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -88,12 +89,11 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting
     props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
     val config3 = KafkaConfig.fromProps(props3)
-    val server3 = new KafkaServer(config3)
-    server3.startup()
-    assertEquals(server3.config.brokerId,3)
+    val server3 = TestUtils.createServer(config3)
+    servers = Seq(server3)
+    assertEquals(server3.config.brokerId, 3)
     server3.shutdown()
-    assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
-    CoreUtils.delete(server3.config.logDirs)
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs, 3))
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -102,21 +102,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // add multiple logDirs and check if the generate brokerId is stored in all of them
     val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
     "," + TestUtils.tempDir().getAbsolutePath
-    props1.setProperty("log.dir",logDirs)
+    props1.setProperty("log.dir", logDirs)
     config1 = KafkaConfig.fromProps(props1)
     var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     server1.startup()
+    servers = Seq(server1)
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
     // addition to log.dirs after generation of a broker.id from zk should be copied over
     val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
-    props1.setProperty("log.dir",newLogDirs)
+    props1.setProperty("log.dir", newLogDirs)
     config1 = KafkaConfig.fromProps(props1)
     server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     server1.startup()
+    servers = Seq(server1)
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
-    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -125,6 +126,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
     var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName)) //auto generate broker Id
     server1.startup()
+    servers = Seq(server1)
     server1.shutdown()
     server1 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName)) // user specified broker id
     try {
@@ -133,7 +135,6 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
       case _: kafka.common.InconsistentBrokerIdException => //success
     }
     server1.shutdown()
-    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -142,8 +143,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // Start a good server
     val propsA = TestUtils.createBrokerConfig(1, zkConnect)
     val configA = KafkaConfig.fromProps(propsA)
-    val serverA = new KafkaServer(configA)
-    serverA.startup()
+    val serverA = TestUtils.createServer(configA)
 
     // Start a server that collides on the broker id
     val propsB = TestUtils.createBrokerConfig(1, zkConnect)
@@ -152,6 +152,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     intercept[RuntimeException] {
       serverB.startup()
     }
+    servers = Seq(serverA)
 
     // verify no broker metadata was written
     serverB.config.logDirs.foreach { logDir =>
@@ -162,26 +163,25 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // adjust the broker config and start again
     propsB.setProperty(KafkaConfig.BrokerIdProp, "2")
     val newConfigB = KafkaConfig.fromProps(propsB)
-    val newServerB = new KafkaServer(newConfigB)
-    newServerB.startup()
+    val newServerB = TestUtils.createServer(newConfigB)
+    servers = Seq(serverA, newServerB)
 
     serverA.shutdown()
     newServerB.shutdown()
+
     // verify correct broker metadata was written
-    assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1))
-    assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2))
-    CoreUtils.delete(serverA.config.logDirs)
-    CoreUtils.delete(newServerB.config.logDirs)
+    assertTrue(verifyBrokerMetadata(serverA.config.logDirs, 1))
+    assertTrue(verifyBrokerMetadata(newServerB.config.logDirs, 2))
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
-    for(logDir <- logDirs) {
+    for (logDir <- logDirs) {
       val brokerMetadataOpt = new BrokerMetadataCheckpoint(
         new File(logDir + File.separator + brokerMetaPropsFile)).read()
       brokerMetadataOpt match {
-        case Some(brokerMetadata: BrokerMetadata) =>
-          if (brokerMetadata.brokerId != brokerId)  return false
+        case Some(brokerMetadata) =>
+          if (brokerMetadata.brokerId != brokerId) return false
         case _ => return false
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 325889f..1ec80fa 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -19,16 +19,17 @@ package kafka.server
 import scala.concurrent._
 import ExecutionContext.Implicits._
 import scala.concurrent.duration._
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.{Before, After, Test}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
   var config1: KafkaConfig = null
   var config2: KafkaConfig = null
   var config3: KafkaConfig = null
+  var servers: Seq[KafkaServer] = Seq()
 
   @Before
   override def setUp() {
@@ -38,12 +39,20 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect))
   }
 
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
+
   @Test
   def testAutoGenerateClusterId() {
     // Make sure that the cluster id doesn't exist yet.
     assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath))
 
     var server1 = TestUtils.createServer(config1)
+    servers = Seq(server1)
 
     // Validate the cluster id
     val clusterIdOnFirstBoot = server1.clusterId
@@ -56,8 +65,8 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
 
     // Restart the server check to confirm that it uses the clusterId generated previously
-    server1 = new KafkaServer(config1)
-    server1.startup()
+    server1 = TestUtils.createServer(config1)
+    servers = Seq(server1)
 
     val clusterIdOnSecondBoot = server1.clusterId
     assertEquals(clusterIdOnFirstBoot, clusterIdOnSecondBoot)
@@ -68,7 +77,6 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
     assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
 
-    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -82,10 +90,9 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
 
     val server3 = TestUtils.createServer(config3)
     val clusterIdFromServer3 = server3.clusterId
+    servers = Seq(server1, server2, server3)
 
-    server1.shutdown()
-    server2.shutdown()
-    server3.shutdown()
+    servers.foreach(_.shutdown())
 
     isValidClusterId(clusterIdFromServer1)
     assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3)
@@ -97,28 +104,23 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     assertEquals(clusterIdFromServer2, server2.clusterId)
     server3.startup()
     assertEquals(clusterIdFromServer3, server3.clusterId)
-    server1.shutdown()
-    server2.shutdown()
-    server3.shutdown()
 
-    CoreUtils.delete(server1.config.logDirs)
-    CoreUtils.delete(server2.config.logDirs)
-    CoreUtils.delete(server3.config.logDirs)
+    servers.foreach(_.shutdown())
+
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   @Test
   def testAutoGenerateClusterIdForKafkaClusterParallel() {
     val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config)))
-    val Seq(server1, server2, server3) = Await.result(firstBoot, 100 second)
+    servers = Await.result(firstBoot, 100 second)
+    val Seq(server1, server2, server3) = servers
 
     val clusterIdFromServer1 = server1.clusterId
     val clusterIdFromServer2 = server2.clusterId
     val clusterIdFromServer3 = server3.clusterId
 
-    server1.shutdown()
-    server2.shutdown()
-    server3.shutdown()
+    servers.foreach(_.shutdown())
     isValidClusterId(clusterIdFromServer1)
     assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3)
 
@@ -127,13 +129,11 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
       server.startup()
       server
     })
-    val servers = Await.result(secondBoot, 100 second)
+    servers = Await.result(secondBoot, 100 second)
     servers.foreach(server => assertEquals(clusterIdFromServer1, server.clusterId))
 
     servers.foreach(_.shutdown())
-    CoreUtils.delete(server1.config.logDirs)
-    CoreUtils.delete(server2.config.logDirs)
-    CoreUtils.delete(server3.config.logDirs)
+
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index ac757d0..a25569f 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,14 +17,23 @@
 
 package kafka.server
 
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
 
 class ServerStartupTest extends ZooKeeperTestHarness {
 
+  private var server: KafkaServer = null
+
+  @After
+  override def tearDown() {
+    if (server != null)
+      TestUtils.shutdownServers(Seq(server))
+    super.tearDown()
+  }
+
   @Test
   def testBrokerCreatesZKChroot {
     val brokerId = 0
@@ -32,13 +41,10 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
-    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    server = TestUtils.createServer(KafkaConfig.fromProps(props))
 
     val pathExists = zkUtils.pathExists(zookeeperChroot)
     assertTrue(pathExists)
-
-    server.shutdown()
-    CoreUtils.delete(server.config.logDirs)
   }
 
   @Test
@@ -46,8 +52,8 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     // Create and start first broker
     val brokerId1 = 0
     val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
-    val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
-    val port = TestUtils.boundPort(server1)
+    server = TestUtils.createServer(KafkaConfig.fromProps(props1))
+    val port = TestUtils.boundPort(server)
 
     // Create a second broker with same port
     val brokerId2 = 1
@@ -57,9 +63,6 @@ class ServerStartupTest extends ZooKeeperTestHarness {
       fail("Starting a broker with the same port should fail")
     } catch {
       case _: RuntimeException => // expected
-    } finally {
-      server1.shutdown()
-      CoreUtils.delete(server1.config.logDirs)
     }
   }
 
@@ -70,7 +73,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
 
     val brokerId = 0
     val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
+    server = TestUtils.createServer(KafkaConfig.fromProps(props1))
     val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1
 
     val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
@@ -84,23 +87,17 @@ class ServerStartupTest extends ZooKeeperTestHarness {
 
     // broker registration shouldn't change
     assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
-
-    server1.shutdown()
-    CoreUtils.delete(server1.config.logDirs)
   }
 
   @Test
   def testBrokerSelfAware {
     val brokerId = 0
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    server = TestUtils.createServer(KafkaConfig.fromProps(props))
 
     TestUtils.waitUntilTrue(() => server.metadataCache.getAliveBrokers.nonEmpty, "Wait for cache to update")
     assertEquals(1, server.metadataCache.getAliveBrokers.size)
     assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id)
-
-    server.shutdown()
-    CoreUtils.delete(server.config.logDirs)
   }
 
   @Test
@@ -119,13 +116,11 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
 
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    val server = new MockKafkaServer(KafkaConfig.fromProps(props))
+    server = new MockKafkaServer(KafkaConfig.fromProps(props))
 
     EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
     EasyMock.replay(mockBrokerState)
 
     server.startup()
-    server.shutdown()
-    CoreUtils.delete(server.config.logDirs)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 4edfbaf..182e904 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -66,8 +66,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   @After
   override def tearDown() {
-    brokers.par.foreach(_.shutdown())
     producer.close()
+    TestUtils.shutdownServers(brokers)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index c5bb5e4..f7110ee 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.utils.SystemTime
 import org.apache.kafka.common.TopicPartition
 
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.{After, Test}
 import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
 
 import scala.collection.JavaConverters._
@@ -51,23 +51,18 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   val tp = t1p0
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
 
-  @Before
-  override def setUp() {
-    super.setUp()
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
-  }
-
   @After
   override def tearDown() {
-    brokers.foreach(_.shutdown())
     if (producer != null)
       producer.close()
+    TestUtils.shutdownServers(brokers)
     super.tearDown()
   }
 
   @Test
   def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
+    brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+
     // Given two topics with replication of a single partition
     for (topic <- List(topic1, topic2)) {
       createTopic(zkUtils, topic, Map(0 -> Seq(0, 1)), servers = brokers)
@@ -280,4 +275,4 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
       partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 01ff83d..a51a07c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -174,6 +174,16 @@ object TestUtils extends Logging {
   }
 
   /**
+    * Shutdown `servers` and delete their log directories.
+    */
+  def shutdownServers(servers: Seq[KafkaServer]) {
+    servers.par.foreach { s =>
+      s.shutdown()
+      CoreUtils.delete(s.config.logDirs)
+    }
+  }
+
+  /**
     * Create a test config for the provided parameters.
     *
     * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
@@ -939,9 +949,10 @@ object TestUtils extends Logging {
   }
 
   def verifyNonDaemonThreadsStatus(threadNamePrefix: String) {
-    assertEquals(0, Thread.getAllStackTraces.keySet().toArray
-      .map(_.asInstanceOf[Thread])
-      .count(t => !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)))
+    val threadCount = Thread.getAllStackTraces.keySet.asScala.count { t =>
+      !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+    }
+    assertEquals(0, threadCount)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/12aa70d5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 2805b3b..b3b10f3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -21,11 +21,14 @@ import javax.security.auth.login.Configuration
 
 import kafka.utils.{CoreUtils, Logging, ZkUtils}
 import org.junit.{After, Before}
+import org.junit.Assert.assertEquals
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
 
+import scala.collection.JavaConverters._
+
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
@@ -41,6 +44,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   
   @Before
   def setUp() {
+    assertNoBrokerControllersRunning()
     zookeeper = new EmbeddedZookeeper()
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
   }
@@ -52,6 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
     Configuration.setConfiguration(null)
+    assertNoBrokerControllersRunning()
   }
 
+  // Tests using this class start ZooKeeper before starting any brokers and shutdown ZK after
+  // shutting down brokers. If tests leave broker controllers running, subsequent tests may fail in
+  // unexpected ways if ZK port is reused. This method ensures that there is no Controller event thread
+  // since the controller loads default JAAS configuration to make connections to brokers on this thread.
+  //
+  // Any tests that use this class and invoke ZooKeeperTestHarness#tearDown() will fail in the tearDown()
+  // if controller event thread is found. Tests with missing broker shutdown which don't use ZooKeeperTestHarness
+  // or its tearDown() will cause an assertion failure in the subsequent test that invokes ZooKeeperTestHarness#setUp(),
+  // making it easier to identify the test with missing shutdown from the test sequence.
+  private def assertNoBrokerControllersRunning() {
+    val threads = Thread.getAllStackTraces.keySet.asScala
+      .map(_.getName)
+      .filter(_.contains("controller-event-thread"))
+    assertEquals(Set(), threads)
+  }
 }


Mime
View raw message