kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-5647; Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand
Date Wed, 20 Dec 2017 20:19:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 35c1be746 -> 488ea4b9f


http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6c5a2ad..7347fa3 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -19,233 +19,251 @@ package kafka.admin
 import java.util.Properties
 
 import kafka.admin.ReassignPartitionsCommand.Throttle
-import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
 import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils.CoreUtils._
 import kafka.utils.TestUtils._
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, EasyMock}
 import org.junit.{Before, Test}
-import org.junit.Assert.{assertEquals, assertNull, fail}
+import org.junit.Assert.{assertEquals, assertNull}
 
 import scala.collection.{Seq, mutable}
 import scala.collection.JavaConversions._
+import org.apache.kafka.common.TopicPartition
 
-class ReassignPartitionsCommandTest extends Logging {
+class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging {
   var calls = 0
 
   @Test
   def shouldFindMovingReplicas() {
-    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val control = new TopicPartition("topic1", 1) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given partition 0 moves from broker 100 -> 102. Partition 1 does not move.
-    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
-    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101), control)
+    val proposed = Map(new TopicPartition("topic1", 0) -> Seq(101, 102), control)
 
-
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
-        assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replica
-        assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
+        assertEquals(Set("0:102"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
//Should only be follower-throttle the moving replica
+        assertEquals(Set("0:100","0:101"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    val admin = new TestAdminZkClient(zkClient)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(1, calls)
   }
 
   @Test
   def shouldFindMovingReplicasWhenProposedIsSubsetOfExisting() {
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given we have more existing partitions than we are proposing
     val existingSuperset = Map(
-      TopicAndPartition("topic1", 0) -> Seq(100, 101),
-      TopicAndPartition("topic1", 1) -> Seq(100, 102),
-      TopicAndPartition("topic1", 2) -> Seq(100, 101),
-      TopicAndPartition("topic2", 0) -> Seq(100, 101, 102),
-      TopicAndPartition("topic3", 0) -> Seq(100, 101, 102)
+      new TopicPartition("topic1", 0) -> Seq(100, 101),
+      new TopicPartition("topic1", 1) -> Seq(100, 102),
+      new TopicPartition("topic1", 2) -> Seq(100, 101),
+      new TopicPartition("topic2", 0) -> Seq(100, 101, 102),
+      new TopicPartition("topic3", 0) -> Seq(100, 101, 102)
     )
     val proposedSubset = Map(
-      TopicAndPartition("topic1", 0) -> Seq(101, 102),
-      TopicAndPartition("topic1", 1) -> Seq(102),
-      TopicAndPartition("topic1", 2) -> Seq(100, 101, 102)
+      new TopicPartition("topic1", 0) -> Seq(101, 102),
+      new TopicPartition("topic1", 1) -> Seq(102),
+      new TopicPartition("topic1", 2) -> Seq(100, 101, 102)
     )
 
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
-        assertEquals("0:102,2:102", configChange.get(FollowerReplicationThrottledReplicasProp))
-        assertEquals("0:100,0:101,2:100,2:101", configChange.get(LeaderReplicationThrottledReplicasProp))
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
+        assertEquals(Set("0:102","2:102"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
+        assertEquals(Set("0:100","0:101","2:100","2:101"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
         assertEquals("topic1", topic)
         calls += 1
       }
+
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
+    val admin = new TestAdminZkClient(zkClient)
     //Then replicas should assign correctly (based on the proposed map)
-    assigner.assignThrottledReplicas(existingSuperset, proposedSubset, mock)
+    assigner.assignThrottledReplicas(existingSuperset, proposedSubset, admin)
     assertEquals(1, calls)
   }
 
   @Test
   def shouldFindMovingReplicasMultiplePartitions() {
-    val control = TopicAndPartition("topic1", 2) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val control = new TopicPartition("topic1", 2) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given partitions 0 & 1 moves from broker 100 -> 102. Partition 2 does not move.
-    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic1",
1) -> Seq(100, 101), control)
-    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), TopicAndPartition("topic1",
1) -> Seq(101, 102), control)
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101), new TopicPartition("topic1",
1) -> Seq(100, 101), control)
+    val proposed = Map(new TopicPartition("topic1", 0) -> Seq(101, 102), new TopicPartition("topic1",
1) -> Seq(101, 102), control)
 
-    // Then
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
-        assertEquals("0:102,1:102", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replica
-        assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
+        assertEquals(Set("0:102","1:102"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
//Should only be follower-throttle the moving replica
+        assertEquals(Set("0:100","0:101","1:100","1:101"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
+
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
+    val admin = new TestAdminZkClient(zkClient)
     //When
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(1, calls)
   }
 
   @Test
   def shouldFindMovingReplicasMultipleTopics() {
-    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val control = new TopicPartition("topic1", 1) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given topics 1 -> move from broker 100 -> 102, topics 2 -> move from broker
101 -> 100
-    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic2",
0) -> Seq(101, 102), control)
-    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), TopicAndPartition("topic2",
0) -> Seq(100, 102), control)
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101), new TopicPartition("topic2",
0) -> Seq(101, 102), control)
+    val proposed = Map(new TopicPartition("topic1", 0) -> Seq(101, 102), new TopicPartition("topic2",
0) -> Seq(100, 102), control)
 
     //Then
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         topic match {
           case "topic1" =>
-            assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp))
-            assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp))
+            assertEquals(Set("0:100","0:101"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
+            assertEquals(Set("0:102"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
           case "topic2" =>
-            assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp))
-            assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp))
+            assertEquals(Set("0:101","0:102"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
+            assertEquals(Set("0:100"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
           case _ => fail(s"Unexpected topic $topic")
         }
         calls += 1
       }
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
+    val admin = new TestAdminZkClient(zkClient)
     //When
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(2, calls)
   }
 
   @Test
   def shouldFindMovingReplicasMultipleTopicsAndPartitions() {
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given
     val existing = Map(
-      TopicAndPartition("topic1", 0) -> Seq(100, 101),
-      TopicAndPartition("topic1", 1) -> Seq(100, 101),
-      TopicAndPartition("topic2", 0) -> Seq(101, 102),
-      TopicAndPartition("topic2", 1) -> Seq(101, 102)
+      new TopicPartition("topic1", 0) -> Seq(100, 101),
+      new TopicPartition("topic1", 1) -> Seq(100, 101),
+      new TopicPartition("topic2", 0) -> Seq(101, 102),
+      new TopicPartition("topic2", 1) -> Seq(101, 102)
     )
     val proposed = Map(
-      TopicAndPartition("topic1", 0) -> Seq(101, 102), //moves to 102
-      TopicAndPartition("topic1", 1) -> Seq(101, 102), //moves to 102
-      TopicAndPartition("topic2", 0) -> Seq(100, 102), //moves to 100
-      TopicAndPartition("topic2", 1) -> Seq(101, 100)  //moves to 100
+      new TopicPartition("topic1", 0) -> Seq(101, 102), //moves to 102
+      new TopicPartition("topic1", 1) -> Seq(101, 102), //moves to 102
+      new TopicPartition("topic2", 0) -> Seq(100, 102), //moves to 100
+      new TopicPartition("topic2", 1) -> Seq(101, 100)  //moves to 100
     )
 
     //Then
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         topic match {
           case "topic1" =>
-            assertEquals("0:102,1:102", configChange.get(FollowerReplicationThrottledReplicasProp))
-            assertEquals("0:100,0:101,1:100,1:101", configChange.get(LeaderReplicationThrottledReplicasProp))
+            assertEquals(Set("0:102","1:102"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
+            assertEquals(Set("0:100","0:101","1:100","1:101"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
           case "topic2" =>
-            assertEquals("0:100,1:100", configChange.get(FollowerReplicationThrottledReplicasProp))
-            assertEquals("0:101,0:102,1:101,1:102", configChange.get(LeaderReplicationThrottledReplicasProp))
+            assertEquals(Set("0:100","1:100"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
+            assertEquals(Set("0:101","0:102","1:101","1:102"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
           case _ => fail(s"Unexpected topic $topic")
         }
         calls += 1
       }
+
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
+    val admin = new TestAdminZkClient(zkClient)
+
     //When
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(2, calls)
   }
 
   @Test
   def shouldFindTwoMovingReplicasInSamePartition() {
-    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
+    val control = new TopicPartition("topic1", 1) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
 
     //Given partition 0 has 2 moves from broker 102 -> 104 & 103 -> 105
-    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 102, 103), control)
-    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 104, 105), control)
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101, 102, 103), control)
+    val proposed = Map(new TopicPartition("topic1", 0) -> Seq(100, 101, 104, 105), control)
 
     // Then
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties)
= {
-        assertEquals("0:104,0:105", configChange.get(FollowerReplicationThrottledReplicasProp))
//Should only be follower-throttle the moving replicas
-        assertEquals("0:100,0:101,0:102,0:103", configChange.get(LeaderReplicationThrottledReplicasProp))
//Should leader-throttle all existing (pre move) replicas
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties) = {
+        assertEquals(Set("0:104","0:105"), toReplicaSet(configChange.get(FollowerReplicationThrottledReplicasProp)))
//Should only be follower-throttle the moving replicas
+        assertEquals(Set("0:100","0:101","0:102","0:103"), toReplicaSet(configChange.get(LeaderReplicationThrottledReplicasProp)))
//Should leader-throttle all existing (pre move) replicas
         calls += 1
       }
+
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {new Properties}
     }
 
+    val admin = new TestAdminZkClient(zkClient)
     //When
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(1, calls)
   }
 
   @Test
   def shouldNotOverwriteEntityConfigsWhenUpdatingThrottledReplicas(): Unit = {
-    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
-    val assigner = new ReassignPartitionsCommand(null, null, null, null)
-    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), control)
-    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), control)
+    val control = new TopicPartition("topic1", 1) -> Seq(100, 102)
+    val assigner = new ReassignPartitionsCommand(null, null, null, null, null)
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101), control)
+    val proposed = Map(new TopicPartition("topic1", 0) -> Seq(101, 102), control)
 
     //Given partition there are existing properties
     val existingProperties = propsWith("some-key", "some-value")
 
     //Then the dummy property should still be there
-    val mock = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties):
Unit = {
+    class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         assertEquals("some-value", configChange.getProperty("some-key"))
         calls += 1
       }
 
-      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String):
Properties = {
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {
         existingProperties
       }
     }
 
+    val admin = new TestAdminZkClient(zkClient)
+
     //When
-    assigner.assignThrottledReplicas(existing, proposed, mock)
+    assigner.assignThrottledReplicas(existing, proposed, admin)
     assertEquals(1, calls)
   }
 
   @Test
   def shouldSetQuotaLimit(): Unit = {
     //Given
-    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
-    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(new TopicPartition("topic1", 0) -> Seq(101, 102))
 
     //Setup
-    val zk = stubZK(existing)
-    val admin = createMock(classOf[AdminUtilities])
+    val zk = stubZKClient(existing)
+    val admin = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
-    expect(admin.fetchEntityConfig(is(zk), anyString(), anyString())).andStubReturn(new Properties)
-    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+    expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties)
+    expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
     replay(admin)
 
     //When
@@ -262,24 +280,24 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldUpdateQuotaLimit(): Unit = {
     //Given
-    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
-    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(new TopicPartition("topic1", 0) -> Seq(101, 102))
 
     //Setup
-    val zk = stubZK(existing)
-    val admin = createMock(classOf[AdminUtilities])
+    val zk = stubZKClient(existing)
+    val admin = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
-    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+    expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Expect the existing broker config to be changed from 10/100 to 1000
     val existingConfigs = CoreUtils.propsWith(
       (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "10"),
       (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "100")
     )
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingConfigs))
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingConfigs))
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingConfigs))
     replay(admin)
 
     //When
@@ -296,18 +314,18 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldNotOverwriteExistingPropertiesWhenLimitIsAdded(): Unit = {
     //Given
-    val existing = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(100, 101))
-    val proposed = mutable.Map(TopicAndPartition("topic1", 0) -> Seq(101, 102))
+    val existing = Map(new TopicPartition("topic1", 0) -> Seq(100, 101))
+    val proposed = mutable.Map(new TopicPartition("topic1", 0) -> Seq(101, 102))
 
     //Setup
-    val zk = stubZK(existing)
-    val admin = createMock(classOf[AdminUtilities])
+    val zk = stubZKClient(existing)
+    val admin = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
-    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
+    expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
 
     //Given there is some existing config
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), anyString())).andReturn(
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andReturn(
       propsWith("useful.key", "useful.value")).atLeastOnce()
 
     replay(admin)
@@ -328,7 +346,7 @@ class ReassignPartitionsCommandTest extends Logging {
   def shouldRemoveThrottleLimitFromAllBrokers(): Unit = {
     //Given 3 brokers, but with assignment only covering 2 of them
     val brokers = Seq(100, 101, 102)
-    val status = mutable.Map(TopicAndPartition("topic1", 0) -> ReassignmentCompleted)
+    val status = mutable.Map(new TopicPartition("topic1", 0) -> ReassignmentCompleted)
     val existingBrokerConfigs = propsWith(
       (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "10"),
       (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "100"),
@@ -336,15 +354,15 @@ class ReassignPartitionsCommandTest extends Logging {
     )
 
     //Setup
-    val zk = stubZK(brokers = brokers)
-    val admin = createMock(classOf[AdminUtilities])
+    val zk = stubZKClient(brokers = brokers)
+    val admin = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), anyString())).andStubReturn(new
Properties)
-    expect(admin.changeBrokerConfig(is(zk), anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
+    expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new
Properties)
+    expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
     //Stub each invocation as EasyMock caches the return value which can be mutated
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingBrokerConfigs))
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingBrokerConfigs))
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingBrokerConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("100"))).andReturn(copyOf(existingBrokerConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("101"))).andReturn(copyOf(existingBrokerConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), is("102"))).andReturn(copyOf(existingBrokerConfigs))
     replay(admin)
 
     //When
@@ -362,8 +380,8 @@ class ReassignPartitionsCommandTest extends Logging {
   @Test
   def shouldRemoveThrottleReplicaListBasedOnProposedAssignment(): Unit = {
     //Given two topics with existing config
-    val status = mutable.Map(TopicAndPartition("topic1", 0) -> ReassignmentCompleted,
-                             TopicAndPartition("topic2", 0) -> ReassignmentCompleted)
+    val status = mutable.Map(new TopicPartition("topic1", 0) -> ReassignmentCompleted,
+                             new TopicPartition("topic2", 0) -> ReassignmentCompleted)
     val existingConfigs = CoreUtils.propsWith(
       (LogConfig.LeaderReplicationThrottledReplicasProp, "1:100:2:100"),
       (LogConfig.FollowerReplicationThrottledReplicasProp, "1:101,2:101"),
@@ -371,16 +389,16 @@ class ReassignPartitionsCommandTest extends Logging {
     )
 
     //Setup
-    val zk = stubZK(brokers = Seq(100, 101))
-    val admin = createMock(classOf[AdminUtilities])
+    val zk = stubZKClient(brokers = Seq(100, 101))
+    val admin = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Broker), anyString())).andStubReturn(new
Properties)
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
-    expect(admin.fetchEntityConfig(is(zk), is(ConfigType.Topic), is("topic2"))).andStubReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new
Properties)
+    expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
+    expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic2"))).andStubReturn(copyOf(existingConfigs))
 
     //Should change both topics
-    expect(admin.changeTopicConfig(is(zk), is("topic1"), capture(propsCapture)))
-    expect(admin.changeTopicConfig(is(zk), is("topic2"), capture(propsCapture)))
+    expect(admin.changeTopicConfig(is("topic1"), capture(propsCapture)))
+    expect(admin.changeTopicConfig(is("topic2"), capture(propsCapture)))
 
     replay(admin)
 
@@ -404,12 +422,16 @@ class ReassignPartitionsCommandTest extends Logging {
     calls = 0
   }
 
-  def stubZK(existingAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map[TopicAndPartition,
Seq[Int]](),
-             brokers: Seq[Int] = Seq[Int]()): ZkUtils = {
-    val zk = createMock(classOf[ZkUtils])
-    expect(zk.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Seq[String]])).andStubReturn(existingAssignment)
-    expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map(TestUtils.createBroker(_,
"", 1)))
-    replay(zk)
-    zk
+  def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition,
Seq[Int]](),
+                   brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = {
+    val zkClient = createMock(classOf[KafkaZkClient])
+    expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment)
+    expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_,
"", 1)))
+    replay(zkClient)
+    zkClient
+  }
+
+  def toReplicaSet(throttledReplicasString: Any): Set[String] = {
+    throttledReplicasString.toString.split(",").toSet
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index bdabd67..4ac64fe 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -13,41 +13,42 @@
 package kafka.admin
 
 import kafka.log.LogConfig
-import kafka.server.{DynamicConfig, ConfigType, KafkaServer}
+import kafka.server.{ConfigType, DynamicConfig, KafkaServer}
 import kafka.utils.TestUtils
+import kafka.zk.AdminZkClient
 
 import scala.collection.Seq
 
 object ReplicationQuotaUtils {
 
-  def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Unit =
{
+  def checkThrottleConfigRemovedFromZK(adminZkClient: AdminZkClient, topic: String, servers:
Seq[KafkaServer]): Unit = {
     TestUtils.waitUntilTrue(() => {
       val hasRateProp = servers.forall { server =>
-        val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
+        val brokerConfig = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
         brokerConfig.contains(DynamicConfig.Broker.LeaderReplicationThrottledRateProp) ||
           brokerConfig.contains(DynamicConfig.Broker.FollowerReplicationThrottledRateProp)
       }
-      val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
+      val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
       val hasReplicasProp = topicConfig.contains(LogConfig.LeaderReplicationThrottledReplicasProp)
||
         topicConfig.contains(LogConfig.FollowerReplicationThrottledReplicasProp)
       !hasRateProp && !hasReplicasProp
     }, "Throttle limit/replicas was not unset")
   }
 
-  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer],
topic: String, throttledLeaders: String, throttledFollowers: String): Unit = {
+  def checkThrottleConfigAddedToZK(adminZkClient: AdminZkClient, expectedThrottleRate: Long,
servers: Seq[KafkaServer], topic: String, throttledLeaders: Set[String], throttledFollowers:
Set[String]): Unit = {
     TestUtils.waitUntilTrue(() => {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>
-        val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker,
server.config.brokerId.toString)
+        val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
         val zkLeaderRate = configInZk.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp)
         val zkFollowerRate = configInZk.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp)
         zkLeaderRate != null && expectedThrottleRate == zkLeaderRate.toLong &&
           zkFollowerRate != null && expectedThrottleRate == zkFollowerRate.toLong
       }
       //Check replicas assigned
-      val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic,
topic)
-      val leader = topicConfig.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp)
-      val follower = topicConfig.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp)
+      val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+      val leader = topicConfig.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp).split(",").toSet
+      val follower = topicConfig.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp).split(",").toSet
       val topicConfigAvailable = leader == throttledLeaders && follower == throttledFollowers
       brokerConfigAvailable && topicConfigAvailable
     }, "throttle limit/replicas was not set")

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index ae0faed..ecaa943 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -24,6 +24,7 @@ import kafka.security.auth._
 import kafka.server.ConfigType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.Test
 
@@ -410,6 +411,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
   }
 
+  @Test
+  def testPreferredReplicaElectionMethods() {
+
+    assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
+
+    val topic1 = "topic1"
+    val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1,
1))
+
+    zkClient.createPreferredReplicaElection(electionPartitions)
+    assertEquals(electionPartitions, zkClient.getPreferredReplicaElection)
+
+    intercept[NodeExistsException] {
+      zkClient.createPreferredReplicaElection(electionPartitions)
+    }
+
+    zkClient.deletePreferredReplicaElection()
+    assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
+  }
+
   private def dataAsString(path: String): Option[String] = {
     val (data, _) = zkClient.getDataAndStat(path)
     data.map(new String(_, UTF_8))

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 01cfa5a..71758a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -16,19 +16,17 @@
  */
 package org.apache.kafka.streams.integration;
 
-
-import kafka.admin.AdminUtils;
 import kafka.log.LogConfig;
 import kafka.utils.MockTime;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -95,20 +93,16 @@ public class InternalTopicIntegrationTest {
     }
 
     private Properties getTopicConfigProperties(final String changelog) {
-        // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't,
then
-        // createTopics() will only seem to work (it will return without error).  The topic
will exist in
-        // only ZooKeeper and will be returned when listing topics, but Kafka itself does
not create the
-        // topic.
-        final ZkClient zkClient = new ZkClient(
+        final ZooKeeperClient zkClient = new ZooKeeperClient(
                 CLUSTER.zKConnectString(),
                 DEFAULT_ZK_SESSION_TIMEOUT_MS,
                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                ZKStringSerializer$.MODULE$);
+                Integer.MAX_VALUE, Time.SYSTEM);
+        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
         try {
-            final boolean isSecure = false;
-            final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()),
isSecure);
+            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
 
-            final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
+            final Map<String, Properties> topicConfigs = adminZkClient.getAllTopicConfigs();
             final Iterator it = topicConfigs.iterator();
             while (it.hasNext()) {
                 final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>)
it.next();
@@ -120,7 +114,7 @@ public class InternalTopicIntegrationTest {
             }
             return new Properties();
         } finally {
-            zkClient.close();
+            kafkaZkClient.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/488ea4b9/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 1863484..e277d82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
@@ -24,11 +23,11 @@ import kafka.server.KafkaServer;
 import kafka.utils.CoreUtils;
 import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
+import kafka.zk.AdminZkClient;
+import kafka.zk.KafkaZkClient;
+import kafka.zookeeper.ZooKeeperClient;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,33 +172,31 @@ public class KafkaEmbedded {
         log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {}
}",
             topic, partitions, replication, topicConfig);
 
-        // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't,
then
-        // createTopic() will only seem to work (it will return without error).  The topic
will exist in
-        // only ZooKeeper and will be returned when listing topics, but Kafka itself does
not create the
-        // topic.
-        final ZkClient zkClient = new ZkClient(
-            zookeeperConnect(),
-            DEFAULT_ZK_SESSION_TIMEOUT_MS,
-            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-            ZKStringSerializer$.MODULE$);
-        final boolean isSecure = false;
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()),
isSecure);
-        AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
-        zkClient.close();
+        final ZooKeeperClient zkClient = new ZooKeeperClient(
+                zookeeperConnect(),
+                DEFAULT_ZK_SESSION_TIMEOUT_MS,
+                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+                Integer.MAX_VALUE,
+                Time.SYSTEM);
+        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
+        final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+        adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+        kafkaZkClient.close();
     }
 
     public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
 
-        final ZkClient zkClient = new ZkClient(
-            zookeeperConnect(),
-            DEFAULT_ZK_SESSION_TIMEOUT_MS,
-            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-            ZKStringSerializer$.MODULE$);
-        final boolean isSecure = false;
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()),
isSecure);
-        AdminUtils.deleteTopic(zkUtils, topic);
-        zkClient.close();
+        final ZooKeeperClient zkClient = new ZooKeeperClient(
+                zookeeperConnect(),
+                DEFAULT_ZK_SESSION_TIMEOUT_MS,
+                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+                Integer.MAX_VALUE,
+                Time.SYSTEM);
+        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
+        final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+        adminZkClient.deleteTopic(topic);
+        kafkaZkClient.close();
     }
 
     public KafkaServer kafkaServer() {


Mime
View raw message