kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3084: Topic existence checks in topic commands (create, alter, delete)
Date Mon, 11 Jan 2016 18:09:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7059158c4 -> 3a0fc125f


KAFKA-3084: Topic existence checks in topic commands (create, alter, delete)

…delete)

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #744 from granthenke/exists-checks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a0fc125
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a0fc125
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a0fc125

Branch: refs/heads/trunk
Commit: 3a0fc125f4337a670ea52009afb1a254179ac07b
Parents: 7059158
Author: Grant Henke <granthenke@gmail.com>
Authored: Mon Jan 11 10:08:45 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jan 11 10:08:45 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 45 +++++++++------
 .../unit/kafka/admin/TopicCommandTest.scala     | 60 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a0fc125/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 37ef9dc..c17b5bc 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,7 +19,7 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
-import kafka.common.{Topic, AdminCommandFailedException}
+import kafka.common.{TopicExistsException, Topic, AdminCommandFailedException}
 import kafka.utils.CommandLineUtils
 import kafka.utils._
 import kafka.utils.ZkUtils._
@@ -50,7 +50,7 @@ object TopicCommand extends Logging {
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
+    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
                           30000,
                           30000,
                           JaasUtils.isZkSecurityEnabled())
@@ -91,25 +91,31 @@ object TopicCommand extends Logging {
   def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
+    val ifNotExists = if (opts.options.has(opts.ifNotExistsOpt)) true else false
     if (Topic.hasCollisionChars(topic))
       println("WARNING: Due to limitations in metric names, topics with a period ('.') or
underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
-    if (opts.options.has(opts.replicaAssignmentOpt)) {
-      val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-      warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment,
configs, update = false)
-    } else {
-      CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
-      val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
-      val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-      warnOnMaxMessagesChange(configs, replicas)
-      AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
+    try {
+      if (opts.options.has(opts.replicaAssignmentOpt)) {
+        val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+        warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
+        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment,
configs, update = false)
+      } else {
+        CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt,
opts.replicationFactorOpt)
+        val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
+        val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+        warnOnMaxMessagesChange(configs, replicas)
+        AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
+      }
+      println("Created topic \"%s\".".format(topic))
+    } catch  {
+      case e: TopicExistsException => if (!ifNotExists) throw e
     }
-    println("Created topic \"%s\".".format(topic))
   }
 
   def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topics = getTopics(zkUtils, opts)
-    if (topics.length == 0) {
+    val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
+    if (topics.length == 0 && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
@@ -155,7 +161,8 @@ object TopicCommand extends Logging {
 
   def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topics = getTopics(zkUtils, opts)
-    if (topics.length == 0) {
+    val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
+    if (topics.length == 0 && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
@@ -307,10 +314,14 @@ object TopicCommand extends Logging {
                                                             "if set when describing topics,
only show partitions whose leader is not available")
     val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
                                                 "if set when describing topics, only show
topics that have overridden configs")
+    val ifExistsOpt = parser.accepts("if-exists",
+                                     "if set when altering or deleting topics, the action
will only execute if the topic exists")
+    val ifNotExistsOpt = parser.accepts("if-not-exists",
+                                        "if set when creating topics, the action will only
execute if the topic does not already exist")
 
     val options = parser.parse(args : _*)
 
-    val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt)
+    val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt,
deleteOpt)
 
     def checkArgs() {
       // check required args
@@ -332,6 +343,8 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts --
Set(alterOpt, deleteOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts
-- Set(createOpt))
     }
   }
   def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a0fc125/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index f14e25a..fd6cdda 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -17,6 +17,7 @@
 package kafka.admin
 
 import junit.framework.Assert._
+import kafka.common.TopicExistsException
 import org.junit.Test
 import kafka.utils.Logging
 import kafka.utils.TestUtils
@@ -97,4 +98,63 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
     }
     assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
   }
+
+  @Test
+  def testDeleteIfExists() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
+
+    // delete a topic that does not exist without --if-exists
+    val deleteOpts = new TopicCommandOptions(Array("--topic", "test"))
+    intercept[IllegalArgumentException] {
+      TopicCommand.deleteTopic(zkUtils, deleteOpts)
+    }
+
+    // delete a topic that does not exist with --if-exists
+    val deleteExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
+    TopicCommand.deleteTopic(zkUtils, deleteExistsOpts)
+  }
+
+  @Test
+  def testAlterIfExists() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
+
+    // alter a topic that does not exist without --if-exists
+    val alterOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1"))
+    intercept[IllegalArgumentException] {
+      TopicCommand.alterTopic(zkUtils, alterOpts)
+    }
+
+    // alter a topic that does not exist with --if-exists
+    val alterExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions",
"1", "--if-exists"))
+    TopicCommand.alterTopic(zkUtils, alterExistsOpts)
+  }
+
+  @Test
+  def testCreateIfNotExists() {
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
+
+    val topic = "test"
+    val numPartitions = 1
+
+    // create the topic
+    val createOpts = new TopicCommandOptions(
+      Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic",
topic))
+    TopicCommand.createTopic(zkUtils, createOpts)
+
+    // try to re-create the topic without --if-not-exists
+    intercept[TopicExistsException] {
+      TopicCommand.createTopic(zkUtils, createOpts)
+    }
+
+    // try to re-create the topic with --if-not-exists
+    val createNotExistsOpts = new TopicCommandOptions(
+      Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic",
topic, "--if-not-exists"))
+    TopicCommand.createTopic(zkUtils, createNotExistsOpts)
+  }
 }


Mime
View raw message