kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7054; Kafka describe command should throw topic doesn't exist exception
Date Fri, 21 Dec 2018 09:18:48 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5533445  KAFKA-7054; Kafka describe command should throw topic doesn't exist exception
5533445 is described below

commit 55334453a561646b303e7de961e5990345effc15
Author: Manohar Vanam <manohar.crazy09@gmail.com>
AuthorDate: Fri Dec 21 14:48:22 2018 +0530

    KAFKA-7054; Kafka describe command should throw topic doesn't exist exception
    
    **User Interface Improvement :** If topic doesn't exist then Kafka describe command should
throw topic doesn't exist exception, like alter and delete commands
    
    Author: Manohar Vanam <manohar.crazy09@gmail.com>
    
    Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Jason Gustafson <jason@confluent.io>,
Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #5211 from ManoharVanam/KAFKA-7054
---
 core/src/main/scala/kafka/admin/TopicCommand.scala | 33 +++++++++++++++-------
 .../scala/unit/kafka/admin/TopicCommandTest.scala  | 18 ++++++++++++
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 37dd233..eaa6d78 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -119,10 +119,7 @@ object TopicCommand extends Logging {
   def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
-    if (topics.isEmpty && !ifExists) {
-      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
-          opts.options.valueOf(opts.zkConnectOpt)))
-    }
+    ensureTopicExists(opts, topics, ifExists)
     val adminZkClient = new AdminZkClient(zkClient)
     topics.foreach { topic =>
       val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
@@ -178,10 +175,7 @@ object TopicCommand extends Logging {
   def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
-    if (topics.isEmpty && !ifExists) {
-      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
-          opts.options.valueOf(opts.zkConnectOpt)))
-    }
+    ensureTopicExists(opts, topics, ifExists)
     topics.foreach { topic =>
       try {
         if (Topic.isInternal(topic)) {
@@ -204,6 +198,8 @@ object TopicCommand extends Logging {
 
   def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
+    val topicOptWithExits = opts.options.has(opts.topicOpt) && opts.options.has(opts.ifExistsOpt)
+    ensureTopicExists(opts, topics, topicOptWithExits)
     val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
     val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
     val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
@@ -256,6 +252,21 @@ object TopicCommand extends Logging {
     }
   }
 
+  /**
+    * ensures topic existence and throws exception if topic doesn't exist
+    *
+    * @param opts
+    * @param topics
+    * @param topicOptWithExists
+    */
+  private def ensureTopicExists(opts: TopicCommandOptions, topics: Seq[String], topicOptWithExists:
Boolean) = {
+    if (topics.isEmpty && !topicOptWithExists) {
+      // If given topic doesn't exist then throw exception
+      throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
+        opts.options.valueOf(opts.zkConnectOpt)))
+    }
+  }
+
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
     require(configsToBeAdded.forall(config => config.length == 2),
@@ -346,7 +357,7 @@ object TopicCommand extends Logging {
     val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
                                                 "if set when describing topics, only show
topics that have overridden configs")
     val ifExistsOpt = parser.accepts("if-exists",
-                                     "if set when altering or deleting topics, the action
will only execute if the topic exists")
+                                     "if set when altering or deleting or describing topics,
the action will only execute if the topic exists")
     val ifNotExistsOpt = parser.accepts("if-not-exists",
                                         "if set when creating topics, the action will only
execute if the topic does not already exist")
 
@@ -363,6 +374,8 @@ object TopicCommand extends Logging {
     def checkArgs() {
       // check required args
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+      if(options.has(describeOpt) && options.has(ifExistsOpt))
+        CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
       if (!options.has(listOpt) && !options.has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
 
@@ -380,7 +393,7 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts
-- Set(listOpt, describeOpt))
     }
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5cfab90..35ea241 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -160,6 +160,24 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with
RackAwareT
   }
 
   @Test
+  def testDescribeIfTopicNotExists() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    // describe topic that does not exist
+    val describeOpts = new TopicCommandOptions(Array("--topic", "test"))
+    intercept[IllegalArgumentException] {
+      TopicCommand.describeTopic(zkClient, describeOpts)
+    }
+
+    // describe topic that does not exist with --if-exists
+    val describeOptsWithExists = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
+    // should not throw any error
+    TopicCommand.describeTopic(zkClient, describeOptsWithExists)
+  }
+
+  @Test
   def testCreateAlterTopicWithRackAware() {
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
     TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)


Mime
View raw message