kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (#8737)
Date Tue, 02 Jun 2020 23:25:17 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 1e17030  KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists
when --bootstrap-server is used (#8737)
1e17030 is described below

commit 1e1703006e5f787091810aef16d9bce9ceb68b21
Author: vinoth chandar <vinothchandar@users.noreply.github.com>
AuthorDate: Tue Jun 2 16:17:05 2020 -0700

    KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server
is used (#8737)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
    (cherry picked from commit ea6d373a93205bad41b42ad9a2b2bec717c11c93)
---
 core/src/main/scala/kafka/admin/TopicCommand.scala | 112 ++++++++++++---------
 .../admin/TopicCommandWithAdminClientTest.scala    |  38 +++++--
 2 files changed, 95 insertions(+), 55 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 86dc84e..8c1629b 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -228,7 +228,7 @@ object TopicCommand extends Logging {
       if (topic.partitions.exists(partitions => partitions < 1))
         throw new IllegalArgumentException(s"The partitions must be greater than 0")
 
-      if (!adminClient.listTopics().names().get().contains(topic.name)) {
+      try {
         val newTopic = if (topic.hasReplicaAssignment)
           new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
         else {
@@ -247,8 +247,12 @@ object TopicCommand extends Logging {
         val createResult = adminClient.createTopics(Collections.singleton(newTopic))
         createResult.all().get()
         println(s"Created topic ${topic.name}.")
-      } else {
-        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")
+      } catch {
+        case e : ExecutionException =>
+          if (e.getCause == null)
+            throw e
+          if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
+            throw e.getCause
       }
     }
 
@@ -259,24 +263,29 @@ object TopicCommand extends Logging {
     override def alterTopic(opts: TopicCommandOptions): Unit = {
       val topic = new CommandTopicPartition(opts)
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
-      val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
-      adminClient.createPartitions(topics.map {topicName =>
-        if (topic.hasReplicaAssignment) {
-          val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
-          val newAssignment = {
-            val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-            new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
+        adminClient.createPartitions(topics.map { topicName =>
+          if (topic.hasReplicaAssignment) {
+            val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
+            val newAssignment = {
+              val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
+              new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+            }
+            topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
+          } else {
+            topicName -> NewPartitions.increaseTo(topic.partitions.get)
           }
-          topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
-        } else {
-          topicName -> NewPartitions.increaseTo(topic.partitions.get)
-        }}.toMap.asJava).all().get()
+        }.toMap.asJava).all().get()
+      }
     }
 
-    private def listAllReassignments(): Map[TopicPartition, PartitionReassignment] = {
+    private def listAllReassignments(topicPartitions: util.Set[TopicPartition]): Map[TopicPartition,
PartitionReassignment] = {
       try {
-        adminClient.listPartitionReassignments(new ListPartitionReassignmentsOptions).reassignments().get().asScala
+        adminClient.listPartitionReassignments(topicPartitions, new ListPartitionReassignmentsOptions)
+          .reassignments().get().asScala
       } catch {
         case e: ExecutionException =>
           e.getCause match {
@@ -290,33 +299,40 @@ object TopicCommand extends Logging {
 
     override def describeTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC,
_)).asJavaCollection).values()
-      val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
-      val reassignments = listAllReassignments()
-      val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-
-      for (td <- topicDescriptions) {
-        val topicName = td.name
-        val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
-        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-        if (describeOptions.describeConfigs) {
-          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-          if (!opts.reportOverriddenConfigs || hasNonDefault) {
-            val numPartitions = td.partitions().size
-            val firstPartition = td.partitions.iterator.next()
-            val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
-            val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition,
reassignment), config, markedForDeletion = false)
-            topicDesc.printDescription()
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      if (topics.nonEmpty) {
+        val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC,
_)).asJavaCollection).values()
+        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
+        val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
+        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+        val topicPartitions = topicDescriptions
+          .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(),
p.partition())))
+          .toSet.asJava
+        val reassignments = listAllReassignments(topicPartitions)
+
+        for (td <- topicDescriptions) {
+          val topicName = td.name
+          val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
+          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+          if (describeOptions.describeConfigs) {
+            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+            if (!opts.reportOverriddenConfigs || hasNonDefault) {
+              val numPartitions = td.partitions().size
+              val firstPartition = td.partitions.iterator.next()
+              val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
+              val topicDesc = TopicDescription(topicName, numPartitions, getReplicationFactor(firstPartition,
reassignment), config, markedForDeletion = false)
+              topicDesc.printDescription()
+            }
           }
-        }
 
-        if (describeOptions.describePartitions) {
-          for (partition <- sortedPartitions) {
-            val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
-            val partitionDesc = PartitionDescription(topicName, partition, Some(config),
markedForDeletion = false, reassignment)
-            describeOptions.maybePrintPartitionDescription(partitionDesc)
+          if (describeOptions.describePartitions) {
+            for (partition <- sortedPartitions) {
+              val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
+              val partitionDesc = PartitionDescription(topicName, partition, Some(config),
markedForDeletion = false, reassignment)
+              describeOptions.maybePrintPartitionDescription(partitionDesc)
+            }
           }
         }
       }
@@ -324,7 +340,7 @@ object TopicCommand extends Logging {
 
     override def deleteTopic(opts: TopicCommandOptions): Unit = {
       val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic)
+      ensureTopicExists(topics, opts.topic, !opts.ifExists)
       adminClient.deleteTopics(topics.asJavaCollection).all().get()
     }
 
@@ -501,7 +517,7 @@ object TopicCommand extends Logging {
     *                           If set to true, the command will throw an exception if the
topic with the
     *                           requested name does not exist.
     */
-  private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String],
requireTopicExists: Boolean = true): Unit = {
+  private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String],
requireTopicExists: Boolean): Unit = {
     // If no topic name was mentioned, do not need to throw exception.
     if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty)
{
       // If given topic doesn't exist then throw exception
@@ -639,9 +655,9 @@ object TopicCommand extends Logging {
     private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
       "if set when describing topics, only show topics that have overridden configs")
     private val ifExistsOpt = parser.accepts("if-exists",
-      "if set when altering or deleting or describing topics, the action will only execute
if the topic exists. Not supported with the --bootstrap-server option.")
+      "if set when altering or deleting or describing topics, the action will only execute
if the topic exists.")
     private val ifNotExistsOpt = parser.accepts("if-not-exists",
-      "if set when creating topics, the action will only execute if the topic does not already
exist. Not supported with the --bootstrap-server option.")
+      "if set when creating topics, the action will only execute if the topic does not already
exist.")
 
     private val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware
replica assignment")
 
@@ -736,8 +752,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt
+ topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt) ++ Set(bootstrapServerOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts
-- Set(listOpt, describeOpt))
     }
   }
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 04712d6..fdf110c 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
@@ -214,7 +215,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
   }
 
   @Test
-  def testCreateIfItAlreadyExists(): Unit = {
+  def testCreateWhenAlreadyExists(): Unit = {
     val numPartitions = 1
 
     // create the topic
@@ -223,12 +224,19 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
     createAndWaitTopic(createOpts)
 
     // try to re-create the topic
-    intercept[IllegalArgumentException] {
+    intercept[TopicExistsException] {
       topicService.createTopic(createOpts)
     }
   }
 
   @Test
+  def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+    val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
+    createAndWaitTopic(createOpts)
+    topicService.createTopic(createOpts)
+  }
+
+  @Test
   def testCreateWithReplicaAssignment(): Unit = {
     // create the topic
     val createOpts = new TopicCommandOptions(
@@ -431,10 +439,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
   }
 
   @Test
-  def testIfExistsAndIfNotExistsOptionsInvalidWithBootstrapServers(): Unit = {
-    // alter a topic that does not exist without --if-exists
-    assertCheckArgsExitCode(1, new TopicCommandOptions(Array("--bootstrap-server", "server1:9092",
"--alter", "--if-exists", "--topic", testTopicName, "--partitions", "1")))
-    assertCheckArgsExitCode(1, new TopicCommandOptions(Array("--bootstrap-server", "server1:9092",
"--create", "--if-not-exists", "--topic", testTopicName, "--partitions", "1", "--replication-factor",
"1")))
+  def testAlterWhenTopicDoesntExistWithIfExists(): Unit = {
+    topicService.alterTopic(new TopicCommandOptions(
+      Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
   }
 
   @Test
@@ -530,7 +537,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
   }
 
   @Test
-  def testDeleteIfExists(): Unit = {
+  def testDeleteWhenTopicDoesntExist(): Unit = {
     // delete a topic that does not exist
     val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
     intercept[IllegalArgumentException] {
@@ -539,6 +546,11 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
   }
 
   @Test
+  def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = {
+    topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
+  }
+
+  @Test
   def testDescribe(): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get()
@@ -552,6 +564,18 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
   }
 
   @Test
+  def testDescribeWhenTopicDoesntExist(): Unit = {
+    intercept[IllegalArgumentException] {
+      topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))
+    }
+  }
+
+  @Test
+  def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = {
+    topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
+  }
+
+  @Test
   def testDescribeUnavailablePartitions(): Unit = {
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get()


Mime
View raw message