kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8875; CreateTopic API should check topic existence before replication factor (#7298)
Date Wed, 11 Sep 2019 21:31:14 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 129d531  KAFKA-8875; CreateTopic API should check topic existence before replication
factor (#7298)
129d531 is described below

commit 129d5317f5abe895cb44955ee4e843e3b09fb864
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Thu Sep 12 05:24:27 2019 +0800

    KAFKA-8875; CreateTopic API should check topic existence before replication factor (#7298)
    
    If the topic already exists, `handleCreateTopicsRequest` should return TopicExistsException
even given an invalid config (replication factor for instance).
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/server/AdminManager.scala     |  5 ++++-
 .../kafka/api/AdminClientIntegrationTest.scala          | 17 +++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index d424700..7115981 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException,
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException,
TopicExistsException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.metrics.Metrics
@@ -84,6 +84,9 @@ class AdminManager(val config: KafkaConfig,
     val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id,
b.rack) }
     val metadata = toCreate.values.map(topic =>
       try {
+        if (metadataCache.contains(topic.name))
+          throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
+
         val configs = new Properties()
         topic.configs().asScala.foreach { case entry =>
           configs.setProperty(entry.name(), entry.value())
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 40d2820..8c92ec6 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -212,6 +212,23 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
   }
 
   @Test
+  def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topic = "mytopic"
+    val topics = Seq(topic)
+    val newTopics = Seq(new NewTopic(topic, 1, 1.toShort))
+
+    client.createTopics(newTopics.asJava).all.get()
+    waitForTopics(client, topics, List())
+
+    val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort))
+    val e = intercept[ExecutionException] {
+      client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
+    }
+    assertTrue(e.getCause.isInstanceOf[TopicExistsException])
+  }
+
+  @Test
   def testMetadataRefresh(): Unit = {
     client = AdminClient.create(createConfig())
     val topics = Seq("mytopic")


Mime
View raw message