kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-5856; Add AdminClient.createPartitions() (KIP-195)
Date Thu, 21 Sep 2017 04:09:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 94692288b -> 5f6393f9b


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 645f6ac..ef9d9c1 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -372,7 +372,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       case _: KafkaException => // this is ok
     }
 
-    AdminUtils.addPartitions(zkUtils, topic, 2)
+    val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+      case (topicPartition, replicas) => topicPartition.partition -> replicas
+    }
+    AdminUtils.addPartitions(zkUtils, topic, existingAssignment, 2)
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 9bc362c..2df5305 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -25,7 +25,9 @@ import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
+import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException}
 import org.apache.kafka.common.network.ListenerName
 import org.junit.{After, Before, Test}
 
@@ -37,9 +39,15 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   val partitionId = 0
 
   val topic1 = "new-topic1"
+  val topic1Assignment = Map(0->Seq(0,1))
   val topic2 = "new-topic2"
+  val topic2Assignment = Map(0->Seq(1,2))
   val topic3 = "new-topic3"
+  val topic3Assignment = Map(0->Seq(2,3,0,1))
   val topic4 = "new-topic4"
+  val topic4Assignment = Map(0->Seq(0,3))
+  val topic5 = "new-topic5"
+  val topic5Assignment = Map(1->Seq(0,1))
 
   @Before
   override def setUp() {
@@ -51,10 +59,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     brokers = servers.map(s => TestUtils.createBroker(s.config.brokerId, s.config.hostName,
TestUtils.boundPort(s)))
 
     // create topics first
-    createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers
= servers)
-    createTopic(zkUtils, topic2, partitionReplicaAssignment = Map(0->Seq(1,2)), servers
= servers)
-    createTopic(zkUtils, topic3, partitionReplicaAssignment = Map(0->Seq(2,3,0,1)), servers
= servers)
-    createTopic(zkUtils, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers
= servers)
+    createTopic(zkUtils, topic1, partitionReplicaAssignment = topic1Assignment, servers =
servers)
+    createTopic(zkUtils, topic2, partitionReplicaAssignment = topic2Assignment, servers =
servers)
+    createTopic(zkUtils, topic3, partitionReplicaAssignment = topic3Assignment, servers =
servers)
+    createTopic(zkUtils, topic4, partitionReplicaAssignment = topic4Assignment, servers =
servers)
   }
 
   @After
@@ -64,28 +72,29 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testTopicDoesNotExist(): Unit = {
+  def testWrongReplicaCount(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, "Blah", 1)
-      fail("Topic should not exist")
+      AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, 2, Some(Map(1 -> Seq(0,
1), 2 -> Seq(0, 1, 2))))
+      fail("Add partitions should fail")
     } catch {
-      case _: AdminOperationException => //this is good
+      case _: InvalidReplicaAssignmentException => //this is good
     }
   }
 
   @Test
-  def testWrongReplicaCount(): Unit = {
+  def testMissingPartition0(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
+      AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, 2, Some(Map(1 -> Seq(0,
1), 2 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
-      case _: AdminOperationException => //this is good
+      case e: AdminOperationException => //this is good
+        assertTrue(e.getMessage.contains("Unexpected existing replica assignment for topic
'new-topic5', partition id 0 is missing"))
     }
   }
 
   @Test
   def testIncrementPartitions(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic1, 3)
+    AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, 3)
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -112,7 +121,8 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testManualAssignmentOfReplicas(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
+    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, 3,
+      Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2)
@@ -140,7 +150,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementAllServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic3, 7)
+    AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, 7)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -167,7 +177,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementPartialServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, 3)
+    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, 3)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 5f76aa7..36f439f 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -33,6 +33,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   var servers: Seq[KafkaServer] = Seq()
 
+  val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+
   @After
   override def tearDown() {
     TestUtils.shutdownServers(servers)
@@ -154,7 +156,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val newPartition = new TopicPartition(topic, 1)
     follower.shutdown()
     // add partitions to topic
-    AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2", false)
+    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, 2, Some(Map(1 ->
Seq(0, 1, 2), 2 -> Seq(0, 1, 2))), false)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     follower.startup()
@@ -174,7 +176,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
     val newPartition = new TopicPartition(topic, 1)
-    AdminUtils.addPartitions(zkUtils, topic, 2, "0:1:2,0:1:2")
+    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, 2, Some(Map(1 ->
Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
@@ -281,7 +283,6 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   }
 
   private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer]
= {
-    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index f2020cf..36e03af 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -96,11 +96,11 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers
+ 1).toShort)).asJava, timeout, true).build(),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
-        Some("replication factor: 4 larger than available brokers: 3"))))
+        Some("Replication factor: 4 larger than available brokers: 3."))))
 
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("error-replication2" -> new CreateTopicsRequest.TopicDetails(10, -1: Short)).asJava,
timeout, true).build(),
-      Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("replication
factor must be larger than 0"))))
+      Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("Replication
factor must be larger than 0."))))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f6393f9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 1a3b5c2..c244bd7 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,6 +24,7 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType
=> AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -299,6 +300,11 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.DESCRIBE_LOG_DIRS =>
           new DescribeLogDirsRequest.Builder(Collections.singleton(tp))
 
+        case ApiKeys.CREATE_PARTITIONS =>
+          new CreatePartitionsRequest.Builder(
+            Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 0, false
+          )
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -392,6 +398,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs
       case ApiKeys.ALTER_REPLICA_DIR => new AlterReplicaDirResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
+      case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }


Mime
View raw message