kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-3175; Topic not accessible after deletion even when delete.topic.enable is disabled
Date Mon, 10 Oct 2016 21:06:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bf98c4738 -> 1bce0d1b5


KAFKA-3175; Topic not accessible after deletion even when delete.topic.enable is disabled

Remove topics under /admin/delete_topics path in zk if deleteTopic is disabled. The topic
should never be enqueued for deletion.

Author: MayureshGharat <gharatmayuresh15@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>,
Jun Rao <junrao@gmail.com>

Closes #846 from MayureshGharat/kafka-3175


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1bce0d1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1bce0d1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1bce0d1b

Branch: refs/heads/trunk
Commit: 1bce0d1b5120743a9a0559a6c9a98249d463fe02
Parents: bf98c47
Author: MayureshGharat <gharatmayuresh15@gmail.com>
Authored: Mon Oct 10 14:06:12 2016 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Mon Oct 10 14:06:12 2016 -0700

----------------------------------------------------------------------
 .../controller/PartitionStateMachine.scala      | 39 ++++++++++++--------
 .../kafka/controller/TopicDeletionManager.scala | 19 ++++++++--
 .../unit/kafka/admin/DeleteTopicTest.scala      | 23 +++++++++++-
 3 files changed, 59 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1bce0d1b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index bf5fde4..32bf4da 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -75,8 +75,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
   // register topic and partition change listeners
   def registerListeners() {
     registerTopicChangeListener()
-    if(controller.config.deleteTopicEnable)
-      registerDeleteTopicListener()
+    registerDeleteTopicListener()
   }
 
   // de-register topic and partition change listeners
@@ -87,8 +86,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
     }
     partitionModificationsListeners.clear()
-    if(controller.config.deleteTopicEnable)
-      deregisterDeleteTopicListener()
+    deregisterDeleteTopicListener()
   }
 
   /**
@@ -469,19 +467,28 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
           nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
         }
         topicsToBeDeleted --= nonExistentTopics
-        if(topicsToBeDeleted.nonEmpty) {
-          info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
-          // mark topic ineligible for deletion if other state changes are in progress
-          topicsToBeDeleted.foreach { topic =>
-            val preferredReplicaElectionInProgress =
-              controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
-            val partitionReassignmentInProgress =
-              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
-            if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
-              controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+        if (controller.config.deleteTopicEnable) {
+          if (topicsToBeDeleted.nonEmpty) {
+            info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+            // mark topic ineligible for deletion if other state changes are in progress
+            topicsToBeDeleted.foreach { topic =>
+              val preferredReplicaElectionInProgress =
+                controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
+              val partitionReassignmentInProgress =
+                controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+              if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
+                controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+            }
+            // add topic to deletion list
+            controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+          }
+        } else {
+          // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+          for (topic <- topicsToBeDeleted) {
+            info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
+            val zkUtils = controllerContext.zkUtils
+            zkUtils.zkClient.delete(getDeleteTopicPath(topic))
           }
-          // add topic to deletion list
-          controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1bce0d1b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index f24c69c..98057dd 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -79,15 +79,26 @@ class TopicDeletionManager(controller: KafkaController,
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
-  val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
-  val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
   val deleteLock = new ReentrantLock()
-  val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
-    (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
   val deleteTopicsCond = deleteLock.newCondition()
   val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
+  val topicsToBeDeleted: mutable.Set[String] = if (isDeleteTopicEnabled) {
+    mutable.Set.empty[String] ++ initialTopicsToBeDeleted
+  } else {
+    // if delete topic is disabled clean the topic entries under /admin/delete_topics
+    val zkUtils = controllerContext.zkUtils
+    for (topic <- initialTopicsToBeDeleted) {
+      val deleteTopicPath = getDeleteTopicPath(topic)
+      info("Removing " + deleteTopicPath + " since delete topic is disabled")
+      zkUtils.zkClient.delete(deleteTopicPath)
+    }
+    mutable.Set.empty[String]
+  }
+  val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
+    (initialTopicsIneligibleForDeletion & topicsToBeDeleted)
+  val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
 
   /**
    * Invoked at the end of new controller initiation

http://git-wip-us.apache.org/repos/asf/kafka/blob/1bce0d1b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index d39de75..ccb3618 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -277,10 +277,10 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers.foreach(_.shutdown())
   }
 
-  private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true):
Seq[KafkaServer] = {
 
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", deleteTopicEnabled.toString)
     )
     createTestTopicAndCluster(topic,brokerConfigs)
   }
@@ -307,4 +307,23 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       (key, count)
     }
   }
+
+  @Test
+  def testDisableDeleteTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
+    // mark the topic for deletion
+    AdminUtils.deleteTopic(zkUtils, "test")
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
+      "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
+    // verify that topic test is untouched
+    assertTrue(servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined))
+    // test the topic path exists
+    assertTrue("Topic path disappeared", zkUtils.pathExists(getTopicPath(topic)))
+    // topic test should have a leader
+    val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
+    assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
+    servers.foreach(_.shutdown())
+  }
 }


Mime
View raw message