kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1317 KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to TopicDeletionManager or MetricsMeter state; reviewed by Neha Narkhede
Date Sat, 29 Mar 2014 01:13:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e287e02b1 -> fb92b3a2c


KAFKA-1317 KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to TopicDeletionManager
or MetricsMeter state; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb92b3a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb92b3a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb92b3a2

Branch: refs/heads/trunk
Commit: fb92b3a2cdf571715b948f81f1f8bb4bc363d497
Parents: e287e02
Author: Timothy Chen <tnachen@gmail.com>
Authored: Fri Mar 28 18:12:53 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Mar 28 18:13:19 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 12 ++-------
 .../kafka/controller/TopicDeletionManager.scala | 27 ++++++++++++--------
 .../unit/kafka/server/ServerShutdownTest.scala  | 20 +++++++++++++++
 3 files changed, 39 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb92b3a2/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7dc2718..c8a56ee 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,9 +37,9 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import org.apache.log4j.Logger
+import java.util.concurrent.locks.ReentrantLock
 import scala.Some
 import kafka.common.TopicAndPartition
-import java.util.concurrent.locks.ReentrantLock
 
 class ControllerContext(val zkClient: ZkClient,
                         val zkSessionTimeout: Int) {
@@ -643,15 +643,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
   def shutdown() = {
     inLock(controllerContext.controllerLock) {
       isRunning = false
-      partitionStateMachine.shutdown()
-      replicaStateMachine.shutdown()
-      if (config.autoLeaderRebalanceEnable)
-        autoRebalanceScheduler.shutdown()
-      if(controllerContext.controllerChannelManager != null) {
-        controllerContext.controllerChannelManager.shutdown()
-        controllerContext.controllerChannelManager = null
-      }
-      info("Controller shutdown complete")
+      onControllerResignation()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb92b3a2/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 488dfd0..09f54ac 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -22,6 +22,8 @@ import kafka.utils.Utils._
 import collection.Set
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.api.{StopReplicaResponse, RequestOrResponse}
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * This manages the state machine for topic deletion.
@@ -71,10 +73,11 @@ class TopicDeletionManager(controller: KafkaController,
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
   var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
+  val deleteLock = new ReentrantLock()
   var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
     (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
-  val deleteTopicsCond = controllerContext.controllerLock.newCondition()
-  var deleteTopicStateChanged: Boolean = false
+  val deleteTopicsCond = deleteLock.newCondition()
+  var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
 
@@ -84,7 +87,7 @@ class TopicDeletionManager(controller: KafkaController,
   def start() {
     if(isDeleteTopicEnabled) {
       deleteTopicsThread = new DeleteTopicsThread()
-      deleteTopicStateChanged = true
+      deleteTopicStateChanged.set(true)
       deleteTopicsThread.start()
     }
   }
@@ -195,19 +198,22 @@ class TopicDeletionManager(controller: KafkaController,
    * controllerLock should be acquired before invoking this API
    */
   private def awaitTopicDeletionNotification() {
-    while(!deleteTopicStateChanged) {
-      info("Waiting for signal to start or continue topic deletion")
-      deleteTopicsCond.await()
+    inLock(deleteLock) {
+      while(!deleteTopicStateChanged.compareAndSet(true, false)) {
+        info("Waiting for signal to start or continue topic deletion")
+        deleteTopicsCond.await()
+      }
     }
-    deleteTopicStateChanged = false
   }
 
   /**
    * Signals the delete-topic-thread to process topic deletion
    */
   private def resumeTopicDeletionThread() {
-    deleteTopicStateChanged = true
-    deleteTopicsCond.signal()
+    deleteTopicStateChanged.set(true)
+    inLock(deleteLock) {
+      deleteTopicsCond.signal()
+    }
   }
 
   /**
@@ -352,8 +358,9 @@ class TopicDeletionManager(controller: KafkaController,
   class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
     val zkClient = controllerContext.zkClient
     override def doWork() {
+      awaitTopicDeletionNotification()
+
       inLock(controllerContext.controllerLock) {
-        awaitTopicDeletionNotification()
         val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
         if(topicsQueuedForDeletion.size > 0)
           info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb92b3a2/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 20fe93e..c7e058f 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness
{
     producer.close()
     server.shutdown()
     Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
+  @Test
+  def testCleanShutdownWithDeleteTopicEnabled() {
+    val newProps = TestUtils.createBrokerConfig(0, port)
+    newProps.setProperty("delete.topic.enable", "true")
+    val newConfig = new KafkaConfig(newProps)
+    var server = new KafkaServer(newConfig)
+    server.startup()
+    server.shutdown()
+    server.awaitShutdown()
+    Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
+  def verifyNonDaemonThreadsStatus() {
+    assertEquals(0, Thread.getAllStackTraces.keySet().toArray
+      .map(_.asInstanceOf[Thread])
+      .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
   }
 }


Mime
View raw message