kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils
Date Sun, 18 Oct 2015 22:24:07 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index e0e46c8..549a96b 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -63,13 +63,13 @@ object TestOffsetManager {
 
   }
 
-  class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkClient: ZkClient)
+  class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkUtils: ZkUtils)
         extends ShutdownableThread("commit-thread")
         with KafkaMetricsGroup {
 
     private val groupId = "group-" + id
     private val metadata = "Metadata from commit thread " + id
-    private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs)
+    private var offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs)
     private var offset = 0L
     val numErrors = new AtomicInteger(0)
     val numCommits = new AtomicInteger(0)
@@ -79,7 +79,7 @@ object TestOffsetManager {
 
     private def ensureConnected() {
       if (!offsetsChannel.isConnected)
-        offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkClient, SocketTimeoutMs)
+        offsetsChannel = ClientUtils.channelToOffsetManager(groupId, zkUtils, SocketTimeoutMs)
     }
 
     override def doWork() {
@@ -119,7 +119,7 @@ object TestOffsetManager {
     }
   }
 
-  class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkClient: ZkClient)
+  class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkUtils: ZkUtils)
         extends ShutdownableThread("fetch-thread")
         with KafkaMetricsGroup {
 
@@ -127,7 +127,7 @@ object TestOffsetManager {
     private val fetchTimer = new KafkaTimer(timer)
 
     private val channels = mutable.Map[Int, BlockingChannel]()
-    private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+    private var metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs)
 
     private val numErrors = new AtomicInteger(0)
 
@@ -141,7 +141,7 @@ object TestOffsetManager {
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)
         else {
-          val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+          val newChannel = ClientUtils.channelToOffsetManager(group, zkUtils, SocketTimeoutMs)
           channels.put(coordinatorId, newChannel)
           newChannel
         }
@@ -173,7 +173,7 @@ object TestOffsetManager {
           println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
           metadataChannel.disconnect()
           println("Creating new query channel.")
-          metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+          metadataChannel = ClientUtils.channelToAnyBroker(zkUtils, SocketTimeoutMs)
       }
       finally {
         Thread.sleep(fetchIntervalMs)
@@ -250,17 +250,17 @@ object TestOffsetManager {
     println("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms"
             .format(threadCount, partitionCount, commitIntervalMs, fetchIntervalMs, reportingIntervalMs))
 
-    var zkClient: ZkClient = null
+    var zkUtils: ZkUtils = null
     var commitThreads: Seq[CommitThread] = Seq()
     var fetchThread: FetchThread = null
     var statsThread: StatsThread = null
     try {
-      zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000)
+      zkUtils = ZkUtils(zookeeper, 6000, 2000, false)
       commitThreads = (0 to (threadCount-1)).map { threadId =>
-        new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
+        new CommitThread(threadId, partitionCount, commitIntervalMs, zkUtils)
       }
 
-      fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkClient)
+      fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkUtils)
 
       val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread)
 
@@ -300,7 +300,7 @@ object TestOffsetManager {
         statsThread.shutdown()
         statsThread.join()
       }
-      zkClient.close()
+      zkUtils.close()
     }
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ed94039..0fce611 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -49,10 +49,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort()))
 
     // create topics first
-    createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-    createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
-    createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers)
-    createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
+    createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+    createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers = servers)
+    createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
   }
 
   @After
@@ -65,7 +65,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testTopicDoesNotExist {
     try {
-      AdminUtils.addPartitions(zkClient, "Blah", 1)
+      AdminUtils.addPartitions(zkUtils, "Blah", 1)
       fail("Topic should not exist")
     } catch {
       case e: AdminOperationException => //this is good
@@ -76,7 +76,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testWrongReplicaCount {
     try {
-      AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
+      AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
     } catch {
       case e: AdminOperationException => //this is good
@@ -86,12 +86,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testIncrementPartitions {
-    AdminUtils.addPartitions(zkClient, topic1, 3)
+    AdminUtils.addPartitions(zkUtils, topic1, 3)
     // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2)
-    val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get
-    val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
+    val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get
+    val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get
     assertEquals(leader1.get, leader1FromZk)
     assertEquals(leader2.get, leader2FromZk)
 
@@ -112,12 +112,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testManualAssignmentOfReplicas {
-    AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
+    AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
     // wait until leader is elected
-    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1)
-    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2)
-    val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get
-    val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2)
+    val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get
+    val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get
     assertEquals(leader1.get, leader1FromZk)
     assertEquals(leader2.get, leader2FromZk)
 
@@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacement {
-    AdminUtils.addPartitions(zkClient, topic3, 7)
+    AdminUtils.addPartitions(zkUtils, topic3, 7)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2d18069..52ea580 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -66,23 +66,23 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
   @Test
   def testManualReplicaAssignment() {
     val brokers = List(0, 1, 2, 3, 4)
-    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
 
     // duplicate brokers
     intercept[IllegalArgumentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,0)))
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0)))
     }
 
     // inconsistent replication factor
     intercept[IllegalArgumentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0)))
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0)))
     }
 
     // good assignment
     val assignment = Map(0 -> List(0, 1, 2),
                          1 -> List(1, 2, 3))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, "test", assignment)
-    val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test"))
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment)
+    val found = zkUtils.getPartitionAssignmentForTopics(Seq("test"))
     assertEquals(assignment, found("test"))
   }
 
@@ -117,19 +117,19 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       11 -> 1
     )
     val topic = "test"
-    TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
+    TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> zkUtils.getReplicasForPartition(topic, p))).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
 
     intercept[TopicExistsException] {
       // shouldn't be able to create a topic that already exists
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     }
   }
 
@@ -137,13 +137,13 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
   def testTopicCreationWithCollision() {
     val topic = "test.topic"
     val collidingTopic = "test_topic"
-    TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
     // create the topic
-    AdminUtils.createTopic(zkClient, topic, 3, 1)
+    AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     intercept[InvalidTopicException] {
       // shouldn't be able to create a topic that collides
-      AdminUtils.createTopic(zkClient, collidingTopic, 3, 1)
+      AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1)
     }
   }
 
@@ -160,25 +160,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
       },
       "Partition reassignment should complete")
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
     // in sync replicas should not have any replica that is not in the new assigned replicas
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
@@ -191,24 +191,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
       },
       "Partition reassignment should complete")
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
 
@@ -222,24 +222,24 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
           Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
       },
       "Partition reassignment should complete")
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
@@ -254,9 +254,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     assertFalse("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    val reassignedPartitions = zkUtils.getPartitionsBeingReassigned()
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
     servers.foreach(_.shutdown())
   }
@@ -266,25 +266,25 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // put the partition in the reassigned path as well
     // reassign partition 0
     val newReplicas = Seq(0, 1)
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
     val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
     // wait until reassignment completes
-    TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
+    TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkUtils),
                             "Partition reassignment should complete")
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    val assignedReplicas = zkUtils.getReplicasForPartition(topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
-    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
@@ -294,10 +294,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
   def testPreferredReplicaJsonData() {
     // write preferred replica json data to zk path
     val partitionsForPreferredReplicaElection = Set(TopicAndPartition("test", 1), TopicAndPartition("test2", 1))
-    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
+    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, partitionsForPreferredReplicaElection)
     // try to read it back and compare with what was written
-    val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
-        ZkUtils.PreferredReplicaLeaderElectionPath)._1
+    val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
     val partitionsUndergoingPreferredReplicaElection =
       PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
     assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
@@ -313,14 +312,14 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
-    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get
+    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get
     // trigger preferred replica election
-    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic, partition)))
+    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition)))
     preferredReplicaElection.moveLeaderToPreferredReplica()
-    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get
+    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get
     assertEquals("Preferred replica election failed", preferredReplica, newLeader)
     servers.foreach(_.shutdown())
   }
@@ -334,9 +333,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
     val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // create the topic
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
 
-    val controllerId = ZkUtils.getController(zkClient)
+    val controllerId = zkUtils.getController()
     val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
     var activeServers = servers.filter(s => s.config.brokerId != 2)
@@ -402,16 +401,16 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       // create a topic with a few config overrides and check that they are applied
       val maxMessageSize = 1024
       val retentionMs = 1000*1000
-      AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
       checkConfig(maxMessageSize, retentionMs)
 
       // now double the config values for the topic and check that it is applied
       val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
-      AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
       checkConfig(2*maxMessageSize, 2 * retentionMs)
 
       // Verify that the same config can be read from ZK
-      val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topic, topic)
+      val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
       assertEquals(newConfig, configInZk)
     } finally {
       server.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index d3abf08..c19127e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -33,11 +33,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
-    AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+    AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete)
 
     TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
       "DeleteConsumerGroupInZK should delete the provided consumer group's directory")
@@ -51,11 +51,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
-    AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
+    AdminUtils.deleteConsumerGroupInZK(zkUtils, groupToDelete)
 
     TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)),
       "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active")
@@ -68,11 +68,11 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topic = "test"
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
-    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic)
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topic)
 
     TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
       "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic")
@@ -86,14 +86,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val otherTopic = "otherTopic"
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
 
     fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false)
 
-    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete)
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, groupToDelete, topicToDelete)
 
     TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)),
       "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic")
@@ -108,13 +108,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topicToDelete = "topicToDelete"
     val otherTopic = "otherTopic"
     val group = "group"
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
 
     fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
     fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
 
-    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete)
+    AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topicToDelete)
 
     TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)),
       "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active")
@@ -128,14 +128,14 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val otherTopic = "otherTopic"
     val groups = Seq("group1", "group2")
 
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topicToDelete, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, otherTopic, 1, 3, servers)
     val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
     val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
     groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
     groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false))
 
-    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete)
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)
 
     TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
       "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
@@ -148,13 +148,13 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topic = "topic"
     val group = "group"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 3, servers)
     val dir = new ZKGroupTopicDirs(group, topic)
     fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
 
-    AdminUtils.deleteTopic(zkClient, topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
-    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
 
     TestUtils.waitUntilTrue(() => !groupDirExists(dir),
       "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
@@ -185,19 +185,19 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val consumerConfig = new ConsumerConfig(consumerProps)
     val dir = new ZKGroupTopicDirs(group, topic)
     TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset)
-    ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
-    ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir)
+    zkUtils.createEphemeralPathExpectConflict(zkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
+    zkUtils.makeSurePersistentPathExists(dir.consumerRegistryDir)
     if (registerConsumer) {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "")
+      zkUtils.createEphemeralPathExpectConflict(dir.consumerRegistryDir + "/" + consumerId, "")
     }
   }
 
   private def groupDirExists(dir: ZKGroupDirs) = {
-    ZkUtils.pathExists(zkClient, dir.consumerGroupDir)
+    zkUtils.pathExists(dir.consumerGroupDir)
   }
 
   private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = {
-    ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir)
+    zkUtils.pathExists(dir.consumerOffsetDir) && zkUtils.pathExists(dir.consumerOwnerDir)
   }
 
   private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index fbae398..383cb44 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -20,6 +20,7 @@ import kafka.log.Log
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.ZkUtils._
 import kafka.server.{KafkaServer, KafkaConfig}
 import org.junit.Test
 import java.util.Properties
@@ -33,8 +34,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicAndPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -44,22 +45,22 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicAndPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // shut down one follower replica
-    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
         .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
     // ensure topic deletion is halted
-    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
     // restart follower replica
     follower.startup()
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -68,25 +69,25 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition("test", 0)
     val topic = topicAndPartition.topic
     val servers = createTestTopicAndCluster(topic)
-    val controllerId = ZkUtils.getController(zkClient)
+    val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
-    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last
     follower.shutdown()
 
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
     // shut down the controller to trigger controller failover during delete topic
     controller.shutdown()
 
     // ensure topic deletion is halted
-    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/test path deleted even when a replica is down")
 
     controller.startup()
     follower.startup()
 
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -101,37 +102,37 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
-    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
     // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
     // the topic is being deleted
     // reassign partition 0
-    val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+    val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+      val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas);
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, topicAndPartition, newReplicas,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
     }, "Partition reassignment shouldn't complete.")
-    val controllerId = ZkUtils.getController(zkClient)
+    val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
     assertFalse("Partition reassignment should fail",
       controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+    val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
     follower.startup()
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     allServers.foreach(_.shutdown())
   }
 
@@ -139,18 +140,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"
     val servers = createTestTopicAndCluster(topic)
-    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     val newPartition = TopicAndPartition(topic, 1)
     follower.shutdown()
     // add partitions to topic
-    AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false)
+    AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false)
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
     follower.startup()
     // test if topic deletion is resumed
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     TestUtils.waitUntilTrue(() =>
       servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
@@ -164,11 +165,11 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition(topic, 0)
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
+    AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
     val newPartition = TopicAndPartition(topic, 1)
-    AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2")
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
@@ -182,12 +183,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition(topic, 0)
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // re-create topic on same replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // wait until leader is elected
-    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
     // check if all replica logs are created
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
@@ -201,16 +202,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicAndPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkClient, "test2")
+    AdminUtils.deleteTopic(zkUtils, "test2")
     // verify delete topic path for test2 is removed from zookeeper
-    TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
     // verify that topic test is untouched
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created")
     // test the topic path exists
-    assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+    assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic)))
     // topic test should have a leader
-    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
     servers.foreach(_.shutdown())
   }
@@ -242,8 +243,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
    server.logManager.cleaner.awaitCleaned(topicName,0,0)
 
     // delete topic
-    AdminUtils.deleteTopic(zkClient, "test")
-    TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+    AdminUtils.deleteTopic(zkUtils, "test")
+    TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
 
     servers.foreach(_.shutdown())
   }
@@ -256,16 +257,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
     try {
       // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
+      AdminUtils.deleteTopic(zkUtils, topic)
       // try to delete topic marked as deleted
-      AdminUtils.deleteTopic(zkClient, topic)
+      AdminUtils.deleteTopic(zkUtils, topic)
       fail("Expected TopicAlreadyMarkedForDeletionException")
     }
     catch {
       case e: TopicAlreadyMarkedForDeletionException => // expected exception
     }
 
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -283,7 +284,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index d4fa0d5..cab4813 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils
+import kafka.utils.ZkUtils._
 import kafka.coordinator.ConsumerCoordinator
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging {
@@ -36,25 +36,25 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
     val cleanupVal = "compact"
     // create brokers
     val brokers = List(0, 1, 2)
-    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
     // create the topic
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--config", cleanupKey + "=" + cleanupVal,
       "--topic", topic))
-    TopicCommand.createTopic(zkClient, createOpts)
-    val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+    TopicCommand.createTopic(zkUtils, createOpts)
+    val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
     assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
     assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
 
     // pre-create the topic config changes path to avoid a NoNodeException
-    ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath)
+    zkUtils.createPersistentPath(EntityConfigChangesPath)
 
     // modify the topic to add new partitions
     val numPartitionsModified = 3
     val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
-    TopicCommand.alterTopic(zkClient, alterOpts)
-    val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+    TopicCommand.alterTopic(zkUtils, alterOpts)
+    val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
     assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
     assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
   }
@@ -67,34 +67,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
 
     // create brokers
     val brokers = List(0, 1, 2)
-    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
 
     // create the NormalTopic
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", normalTopic))
-    TopicCommand.createTopic(zkClient, createOpts)
+    TopicCommand.createTopic(zkUtils, createOpts)
 
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
-    val deletePath = ZkUtils.getDeleteTopicPath(normalTopic)
-    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath))
-    TopicCommand.deleteTopic(zkClient, deleteOpts)
-    assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath))
+    val deletePath = getDeleteTopicPath(normalTopic)
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deletePath))
+    TopicCommand.deleteTopic(zkUtils, deleteOpts)
+    assertTrue("Delete path for topic should exist after deletion.", zkUtils.zkClient.exists(deletePath))
 
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", ConsumerCoordinator.OffsetsTopicName))
-    TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
+    TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
 
     // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
-    val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
-    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
+    val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
-        TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
+        TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
     }
-    assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
+    assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index d43778e..50496f0 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -43,10 +43,10 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
     val notificationMessage2 = "message2"
     val changeExpirationMs = 100
 
-    val notificationListener = new ZkNodeChangeNotificationListener(zkClient, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
+    val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
-    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage1)
+    zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
 
     TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
 
@@ -55,7 +55,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
     Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted
     depending on how threads get scheduled.*/
 
-    ZkUtils.createSequentialPersistentPath(zkClient, seqNodePath, notificationMessage2)
+    zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
     TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ca63c80..b8ad15b 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -57,7 +57,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
       new AtomicLong(0),
       new AtomicInteger(0),
       ""))
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+    createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 6c22e8b..818229c 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -48,9 +48,9 @@ class PartitionAssignorTest extends Logging {
         ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
       }).toSeq:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
-      val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
-      EasyMock.replay(zkClient)
-      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
     })
   }
 
@@ -73,10 +73,10 @@ class PartitionAssignorTest extends Logging {
         ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
       }).toSeq:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
-      val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
-      EasyMock.replay(zkClient)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
 
-      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
     })
   }
 }
@@ -135,6 +135,7 @@ private object PartitionAssignorTest extends Logging {
     val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*)
 
     val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, false)
     EasyMock.checkOrder(zkClient, false)
 
     EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers)
@@ -149,21 +150,21 @@ private object PartitionAssignorTest extends Logging {
     scenario.topicPartitionCounts.foreach { case(topic, partitionCount) =>
       val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*)
       EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat()))
-              .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+              .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
       EasyMock.expectLastCall().anyTimes()
     }
 
-    EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn(
+    EasyMock.expect(zkUtils.zkClient.getChildren("/brokers/topics")).andReturn(
       java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*))
     EasyMock.expectLastCall().anyTimes()
 
-    zkClient
+    zkUtils
   }
 
-  private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkClient: ZkClient,
+  private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
                               verifyAssignmentIsUniform: Boolean = false) {
     val assignments = scenario.subscriptions.map{ case(consumer, subscription)  =>
-      val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
+      val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
       assignor.assign(ctx).get(consumer)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index cb59542..28b1dd5 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -97,8 +97,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
       sendMessages(servers, topic, nMessages, 1)
 
     // wait to make sure the topic and partition have a leader for the successful case
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -130,8 +130,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
                          sendMessages(servers, topic, nMessages, 1)
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
     assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
@@ -151,8 +151,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
                         sendMessages(servers, topic, nMessages, 1)
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
     assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
@@ -185,8 +185,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
                         sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -218,8 +218,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
                         sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
     assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
@@ -239,8 +239,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
                         sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages)
     assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
@@ -294,8 +294,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 1)
 
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)
@@ -322,10 +322,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
   @Test
   def testLeaderSelectionForPartition() {
-    val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
+    val zkUtils = ZkUtils(zkConnect, 6000, 30000, false)
 
     // create topic topic1 with 1 partition on broker 0
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(servers, topic, nMessages)
@@ -349,7 +349,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
     assertEquals(sentMessages1, receivedMessages1)
     zkConsumerConnector1.shutdown()
-    zkClient.close()
+    zkUtils.close()
   }
 
   @Test
@@ -416,14 +416,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   }
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
-    val children = zkClient.getChildren(path)
+    val children = zkUtils.zkClient.getChildren(path)
     Collections.sort(children)
     val childrenAsSeq : Seq[java.lang.String] = {
       import scala.collection.JavaConversions._
       children.toSeq
     }
     childrenAsSeq.map(partition =>
-      (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
+      (partition, zkUtils.zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }
 
   private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index c93eca5..91ac1f6 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -75,7 +75,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
       }
     }
     // Create topic with one partition
-    kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1)
+    kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1)
     val topicPartition = TopicAndPartition("topic1", 0)
     var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
     while (!partitions.contains(topicPartition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
index d8a7948..3bc37e5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
@@ -18,7 +18,8 @@
 package kafka.coordinator
 
 import kafka.server.KafkaConfig
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.{TestUtils, ZkUtils}
+import kafka.utils.ZkUtils._
 
 import org.junit.Assert._
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
@@ -33,14 +34,15 @@ import org.scalatest.junit.JUnitSuite
 class CoordinatorMetadataTest extends JUnitSuite {
   val DefaultNumPartitions = 8
   val DefaultNumReplicas = 2
-  var zkClient: ZkClient = null
+  var zkUtils: ZkUtils = null
   var coordinatorMetadata: CoordinatorMetadata = null
 
   @Before
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    zkClient = EasyMock.createStrictMock(classOf[ZkClient])
-    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null)
+    val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+    zkUtils = ZkUtils(zkClient, false)
+    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null)
   }
 
   @Test
@@ -77,8 +79,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
     val topics = Set("a")
     coordinatorMetadata.addGroup(groupId, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(groupId, topics)
     assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
   }
@@ -91,8 +93,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
     coordinatorMetadata.addGroup(group1, "range")
     coordinatorMetadata.addGroup(group2, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(group1, topics)
     coordinatorMetadata.bindGroupToTopics(group2, topics)
     assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
@@ -111,8 +113,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
     val topics = Set("a")
     coordinatorMetadata.addGroup(groupId, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(groupId, topics)
     coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b"))
     assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
@@ -126,8 +128,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
     coordinatorMetadata.addGroup(group1, "range")
     coordinatorMetadata.addGroup(group2, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(group1, topics)
     coordinatorMetadata.bindGroupToTopics(group2, topics)
     coordinatorMetadata.unbindGroupFromTopics(group1, topics)
@@ -140,9 +142,9 @@ class CoordinatorMetadataTest extends JUnitSuite {
     val topics = Set("a")
     coordinatorMetadata.addGroup(groupId, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    expectZkClientUnsubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(groupId, topics)
     coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
     assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
@@ -163,8 +165,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
     coordinatorMetadata.addGroup(group1, "range")
     coordinatorMetadata.addGroup(group2, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(group1, topics)
     coordinatorMetadata.bindGroupToTopics(group2, topics)
     coordinatorMetadata.removeGroup(group1, topics)
@@ -179,17 +181,17 @@ class CoordinatorMetadataTest extends JUnitSuite {
     val topics = Set("a")
     coordinatorMetadata.addGroup(groupId, "range")
 
-    expectZkClientSubscribeDataChanges(zkClient, topics)
-    expectZkClientUnsubscribeDataChanges(zkClient, topics)
-    EasyMock.replay(zkClient)
+    expectZkClientSubscribeDataChanges(zkUtils, topics)
+    expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
+    EasyMock.replay(zkUtils.zkClient)
     coordinatorMetadata.bindGroupToTopics(groupId, topics)
     coordinatorMetadata.removeGroup(groupId, topics)
     assertNull(coordinatorMetadata.getGroup(groupId))
     assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
   }
 
-  private def expectZkClientSubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
-    topics.foreach(topic => expectZkClientSubscribeDataChange(zkClient, topic))
+  private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) {
+    topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic))
   }
 
   private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
@@ -200,14 +202,14 @@ class CoordinatorMetadataTest extends JUnitSuite {
     val replicaAssignment =
       (0 until DefaultNumPartitions)
       .map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap
-    val topicPath = ZkUtils.getTopicPath(topic)
+    val topicPath = getTopicPath(topic)
     EasyMock.expect(zkClient.readData(topicPath, new Stat()))
-      .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+      .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
     zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
   }
 
   private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) {
-    val topicPath = ZkUtils.getTopicPath(topic)
+    val topicPath = getTopicPath(topic)
     zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 818673f..a71ddf1 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -76,7 +76,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
-    TestUtils.createTopic(zkClient, topic, 1, 1, servers)
+    TestUtils.createTopic(zkUtils, topic, 1, 1, servers)
 
     val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
       TestUtils.getBrokerListStrFromServers(servers),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 20a4068..3cf4dae 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -62,7 +62,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
   def testTopicMetadataRequest {
     // create topic
     val topic = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
@@ -79,7 +79,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
   def testBasicTopicMetadata {
     // create topic
     val topic = "test"
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
@@ -98,8 +98,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // create topic
     val topic1 = "testGetAllTopicMetadata1"
     val topic2 = "testGetAllTopicMetadata2"
-    createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-    createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkUtils, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
+    createTopic(zkUtils, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
 
     // issue metadata request with empty list of topics
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
@@ -130,7 +130,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
 
     // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
 
     // retry the metadata for the auto created topic
@@ -159,7 +159,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode, topicsMetadata(1).errorCode)
 
     // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
     TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
 
     // retry the metadata for the first auto created topic
@@ -218,7 +218,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
 
     // create topic
     val topic: String = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, numBrokers)
+    AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
 
     // shutdown a broker
     adHocServers.last.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index c061597..5af5d1a 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -42,11 +42,11 @@ class FetcherTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
 
     val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())))
 
-    fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
+    fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
     fetcher.stopConnections()
     val topicInfos = configs.map(c =>
       new PartitionTopicInfo(topic,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index e6f0c54..df752db 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -113,7 +113,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
 
   private def produceAndMultiFetch(producer: Producer[String, String]) {
     for(topic <- List("test1", "test2", "test3", "test4"))
-      TestUtils.createTopic(zkClient, topic, servers = servers)
+      TestUtils.createTopic(zkUtils, topic, servers = servers)
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -182,7 +182,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
 
   private def multiProduce(producer: Producer[String, String]) {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+    topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers))
 
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
@@ -210,7 +210,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
   @Test
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
-    TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    TestUtils.createTopic(zkUtils, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
@@ -219,7 +219,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
   @Test
   def testPipelinedProduceRequests() {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+    topics.keys.map(topic => TestUtils.createTopic(zkUtils, topic, servers = servers))
     val props = new Properties()
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] =

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 4d73be1..b931568 100755
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -56,10 +56,10 @@ class RollingBounceTest extends ZooKeeperTestHarness {
     val topic4 = "new-topic4"
 
     // create topics with 1 partition, 2 replicas, one on each broker
-    createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-    createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
-    createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
-    createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
+    createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers)
+    createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3)), servers = servers)
+    createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
 
     // Do a rolling bounce and check if leader transitions happen correctly
 
@@ -86,7 +86,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
       servers((startIndex + 1) % 4).shutdown()
       prevLeader = (startIndex + 1) % 4
     }
-    var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    var newleader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     // Ensure the new leader is different from the old
     assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader))
     // Start the server back up again


Mime
View raw message