This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5533445 KAFKA-7054; Kafka describe command should throw topic doesn't exist exception
5533445 is described below
commit 55334453a561646b303e7de961e5990345effc15
Author: Manohar Vanam <manohar.crazy09@gmail.com>
AuthorDate: Fri Dec 21 14:48:22 2018 +0530
KAFKA-7054; Kafka describe command should throw topic doesn't exist exception
**User Interface Improvement :** If topic doesn't exist then Kafka describe command should
throw topic doesn't exist exception, like alter and delete commands
Author: Manohar Vanam <manohar.crazy09@gmail.com>
Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Jason Gustafson <jason@confluent.io>,
Manikumar Reddy <manikumar.reddy@gmail.com>
Closes #5211 from ManoharVanam/KAFKA-7054
---
core/src/main/scala/kafka/admin/TopicCommand.scala | 33 +++++++++++++++-------
.../scala/unit/kafka/admin/TopicCommandTest.scala | 18 ++++++++++++
2 files changed, 41 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 37dd233..eaa6d78 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -119,10 +119,7 @@ object TopicCommand extends Logging {
def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
- if (topics.isEmpty && !ifExists) {
- throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
- opts.options.valueOf(opts.zkConnectOpt)))
- }
+ ensureTopicExists(opts, topics, ifExists)
val adminZkClient = new AdminZkClient(zkClient)
topics.foreach { topic =>
val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
@@ -178,10 +175,7 @@ object TopicCommand extends Logging {
def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
- if (topics.isEmpty && !ifExists) {
- throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
- opts.options.valueOf(opts.zkConnectOpt)))
- }
+ ensureTopicExists(opts, topics, ifExists)
topics.foreach { topic =>
try {
if (Topic.isInternal(topic)) {
@@ -204,6 +198,8 @@ object TopicCommand extends Logging {
def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
+ val topicOptWithExits = opts.options.has(opts.topicOpt) && opts.options.has(opts.ifExistsOpt)
+ ensureTopicExists(opts, topics, topicOptWithExits)
val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
@@ -256,6 +252,21 @@ object TopicCommand extends Logging {
}
}
+ /**
+ * ensures topic existence and throws exception if topic doesn't exist
+ *
+ * @param opts
+ * @param topics
+ * @param topicOptWithExists
+ */
+ private def ensureTopicExists(opts: TopicCommandOptions, topics: Seq[String], topicOptWithExists:
Boolean) = {
+ if (topics.isEmpty && !topicOptWithExists) {
+ // If given topic doesn't exist then throw exception
+ throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
+ opts.options.valueOf(opts.zkConnectOpt)))
+ }
+ }
+
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
@@ -346,7 +357,7 @@ object TopicCommand extends Logging {
val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show
topics that have overridden configs")
val ifExistsOpt = parser.accepts("if-exists",
- "if set when altering or deleting topics, the action
will only execute if the topic exists")
+ "if set when altering or deleting or describing topics,
the action will only execute if the topic exists")
val ifNotExistsOpt = parser.accepts("if-not-exists",
"if set when creating topics, the action will only
execute if the topic does not already exist")
@@ -363,6 +374,8 @@ object TopicCommand extends Logging {
def checkArgs() {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
+ if(options.has(describeOpt) && options.has(ifExistsOpt))
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (!options.has(listOpt) && !options.has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
@@ -380,7 +393,7 @@ object TopicCommand extends Logging {
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt, describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts
-- Set(listOpt, describeOpt))
}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5cfab90..35ea241 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -160,6 +160,24 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with
RackAwareT
}
@Test
+ def testDescribeIfTopicNotExists() {
+ // create brokers
+ val brokers = List(0, 1, 2)
+ TestUtils.createBrokersInZk(zkClient, brokers)
+
+ // describe topic that does not exist
+ val describeOpts = new TopicCommandOptions(Array("--topic", "test"))
+ intercept[IllegalArgumentException] {
+ TopicCommand.describeTopic(zkClient, describeOpts)
+ }
+
+ // describe topic that does not exist with --if-exists
+ val describeOptsWithExists = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
+ // should not throw any error
+ TopicCommand.describeTopic(zkClient, describeOptsWithExists)
+ }
+
+ @Test
def testCreateAlterTopicWithRackAware() {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)
|