kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1443 Add delete topic option to topic commands; reviewed by Neha Narkhede
Date Fri, 06 Jun 2014 16:46:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1363ed7c5 -> ee4456a3f


KAFKA-1443 Add delete topic option to topic commands; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee4456a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee4456a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee4456a3

Branch: refs/heads/trunk
Commit: ee4456a3f45eac7df31a170d6ce9d0a4a2fe98a0
Parents: 1363ed7
Author: Timothy Chen <tnachen@gmail.com>
Authored: Fri Jun 6 09:45:55 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Jun 6 09:46:02 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/admin/DeleteTopicCommand.scala  | 66 --------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 16 ++++-
 2 files changed, 13 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee4456a3/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
deleted file mode 100644
index 804b331..0000000
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
-
-object DeleteTopicCommand {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.")
-                         .withRequiredArg
-                         .describedAs("topic")
-                         .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-      println("deletion succeeded!")
-    }
-    catch {
-      case e: Throwable =>
-        println("delection failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    }
-    finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee4456a3/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index bdc72ea..6788c2e 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -36,9 +36,9 @@ object TopicCommand {
     val opts = new TopicCommandOptions(args)
     
     // should have exactly one action
-    val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has
_)
+    val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has
_)
     if(actions != 1) {
-      System.err.println("Command must include exactly one action: --list, --describe, --create
or --alter")
+      System.err.println("Command must include exactly one action: --list, --describe, --create,
--alter or --delete")
       opts.parser.printHelpOn(System.err)
       System.exit(1)
     }
@@ -56,6 +56,8 @@ object TopicCommand {
         listTopics(zkClient, opts)
       else if(opts.options.has(opts.describeOpt))
         describeTopic(zkClient, opts)
+      else if(opts.options.has(opts.deleteOpt))
+        deleteTopic(zkClient, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing topic command " + e.getMessage)
@@ -122,7 +124,14 @@ object TopicCommand {
     for(topic <- topics)
         println(topic)
   }
-  
+
+  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
+    topics.foreach { topic =>
+      ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+    }
+  }
+
   def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt))
true else false
@@ -210,6 +219,7 @@ object TopicCommand {
                            .ofType(classOf[String])
     val listOpt = parser.accepts("list", "List all available topics.")
     val createOpt = parser.accepts("create", "Create a new topic.")
+    val deleteOpt = parser.accepts("delete", "Delete a topic")
     val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
     val helpOpt = parser.accepts("help", "Print usage information.")


Mime
View raw message