This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5533445 KAFKA-7054; Kafka describe command should throw topic doesn't exist exception 5533445 is described below commit 55334453a561646b303e7de961e5990345effc15 Author: Manohar Vanam AuthorDate: Fri Dec 21 14:48:22 2018 +0530 KAFKA-7054; Kafka describe command should throw topic doesn't exist exception **User Interface Improvement :** If topic doesn't exist then Kafka describe command should throw topic doesn't exist exception, like alter and delete commands Author: Manohar Vanam Reviewers: Vahid Hashemian , Jason Gustafson , Manikumar Reddy Closes #5211 from ManoharVanam/KAFKA-7054 --- core/src/main/scala/kafka/admin/TopicCommand.scala | 33 +++++++++++++++------- .../scala/unit/kafka/admin/TopicCommandTest.scala | 18 ++++++++++++ 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 37dd233..eaa6d78 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -119,10 +119,7 @@ object TopicCommand extends Logging { def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val ifExists = opts.options.has(opts.ifExistsOpt) - if (topics.isEmpty && !ifExists) { - throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), - opts.options.valueOf(opts.zkConnectOpt))) - } + ensureTopicExists(opts, topics, ifExists) val adminZkClient = new AdminZkClient(zkClient) topics.foreach { topic => val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) @@ -178,10 +175,7 @@ object TopicCommand extends Logging { def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val ifExists = opts.options.has(opts.ifExistsOpt) - if (topics.isEmpty && !ifExists) { - throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), - opts.options.valueOf(opts.zkConnectOpt))) - } + ensureTopicExists(opts, topics, ifExists) topics.foreach { topic => try { if (Topic.isInternal(topic)) { @@ -204,6 +198,8 @@ object TopicCommand extends Logging { def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) + val topicOptWithExits = opts.options.has(opts.topicOpt) && opts.options.has(opts.ifExistsOpt) + ensureTopicExists(opts, topics, topicOptWithExits) val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt) val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt) val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt) @@ -256,6 +252,21 @@ object TopicCommand extends Logging { } } + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param opts + * @param topics + * @param topicOptWithExists + */ + private def ensureTopicExists(opts: TopicCommandOptions, topics: Seq[String], topicOptWithExists: Boolean) = { + if (topics.isEmpty && !topicOptWithExists) { + // If given topic doesn't exist then throw exception + throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), + opts.options.valueOf(opts.zkConnectOpt))) + } + } + def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = { val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*""")) require(configsToBeAdded.forall(config => config.length == 2), @@ -346,7 +357,7 @@ object TopicCommand extends Logging { val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", "if set when describing topics, only show topics that have overridden configs") val ifExistsOpt = parser.accepts("if-exists", - "if set when altering or deleting topics, the action will only execute if the topic exists") + "if set when altering or deleting or describing topics, the action will only execute if the topic exists") val ifNotExistsOpt = parser.accepts("if-not-exists", "if set when creating topics, the action will only execute if the topic does not already exist") @@ -363,6 +374,8 @@ object TopicCommand extends Logging { def checkArgs() { // check required args CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if(options.has(describeOpt) && options.has(ifExistsOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) if (!options.has(listOpt) && !options.has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) @@ -380,7 +393,7 @@ object TopicCommand extends Logging { allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt) CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt) - CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)) CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt)) } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 5cfab90..35ea241 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -160,6 +160,24 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT } @Test + def testDescribeIfTopicNotExists() { + // create brokers + val brokers = List(0, 1, 2) + TestUtils.createBrokersInZk(zkClient, brokers) + + // describe topic that does not exist + val describeOpts = new TopicCommandOptions(Array("--topic", "test")) + intercept[IllegalArgumentException] { + TopicCommand.describeTopic(zkClient, describeOpts) + } + + // describe topic that does not exist with --if-exists + val describeOptsWithExists = new TopicCommandOptions(Array("--topic", "test", "--if-exists")) + // should not throw any error + TopicCommand.describeTopic(zkClient, describeOptsWithExists) + } + + @Test def testCreateAlterTopicWithRackAware() { val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)