kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1961 Prevent deletion of _consumer_offsets topic; reviewed by Neha Narkhede, Gwen Shapira and Jun Rao
Date Fri, 03 Apr 2015 18:44:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ad722531d -> 48f997047


KAFKA-1961 Prevent deletion of _consumer_offsets topic; reviewed by Neha Narkhede, Gwen Shapira
and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48f99704
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48f99704
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48f99704

Branch: refs/heads/trunk
Commit: 48f997047228c327f91e8f848142b4607366fc3e
Parents: ad72253
Author: Ted Malaska <ted.malaska@cloudera.com>
Authored: Fri Apr 3 11:43:52 2015 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Apr 3 11:43:59 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 18 ++++++---
 .../unit/kafka/admin/TopicCommandTest.scala     | 41 +++++++++++++++++++-
 2 files changed, 52 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48f99704/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f400b71..d430e1d 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,7 +19,7 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
-import kafka.common.AdminCommandFailedException
+import kafka.common.{Topic, AdminCommandFailedException}
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -143,15 +143,21 @@ object TopicCommand {
     }
     topics.foreach { topic =>
       try {
-        ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
-        println("Topic %s is marked for deletion.".format(topic))
-        println("Note: This will have no impact if delete.topic.enable is not set to true.")
+        if (Topic.InternalTopics.contains(topic)) {
+          throw new AdminOperationException("Topic %s is a kafka internal topic and is not
allowed to be marked for deletion.".format(topic));
+        } else {
+          ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+          println("Topic %s is marked for deletion.".format(topic))
+          println("Note: This will have no impact if delete.topic.enable is not set to true.")
+        }
       } catch {
         case e: ZkNodeExistsException =>
           println("Topic %s is already marked for deletion.".format(topic))
-        case e2: Throwable =>
+        case e: AdminOperationException =>
+          throw e
+        case e: Throwable =>
           throw new AdminOperationException("Error while deleting topic %s".format(topic))
-      }    
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/48f99704/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index ac6dd20..c7136f2 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.utils.Logging
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.KafkaConfig
+import kafka.server.{OffsetManager, KafkaConfig}
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils
 
@@ -60,4 +60,43 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with
Loggin
     assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
     assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
   }
+
+  @Test
+  def testTopicDeletion() {
+    val normalTopic = "test"
+
+    val numPartitionsOriginal = 1
+
+    // create brokers
+    val brokers = List(0, 1, 2)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    // create the NormalTopic
+    val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
+      "--replication-factor", "1",
+      "--topic", normalTopic))
+    TopicCommand.createTopic(zkClient, createOpts)
+
+    // delete the NormalTopic
+    val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
+    val deletePath = ZkUtils.getDeleteTopicPath(normalTopic)
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath))
+    TopicCommand.deleteTopic(zkClient, deleteOpts)
+    assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath))
+
+    // create the offset topic
+    val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
+      "--replication-factor", "1",
+      "--topic", OffsetManager.OffsetsTopicName))
+    TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
+
+    // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName))
+    val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName)
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
+    intercept[AdminOperationException] {
+        TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
+    }
+    assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
+  }
 }
\ No newline at end of file


Mime
View raw message