kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (#6218)
Date Tue, 26 Feb 2019 17:55:50 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd6520a  KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the
thread has not been started (#6218)
bd6520a is described below

commit bd6520a22a773b94aece20826bb00189986b3966
Author: Gardner Vickers <gardner@vickers.me>
AuthorDate: Tue Feb 26 12:55:38 2019 -0500

    KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has
not been started (#6218)
    
    In some test cases it's desirable to instantiate a subclass of `ShutdownableThread` without
starting it. Since most subclasses of `ShutdownableThread` put cleanup logic in `ShutdownableThread.shutdown()`,
being able to call `shutdown()` on the non-running thread would be useful.
    
    This change allows us to avoid blocking in `ShutdownableThread.shutdown()` if the thread's
`run()` method has not been called. We also add a check that `initiateShutdown()` was called
before `awaitShutdown()`, to protect against the case where a user calls `awaitShutdown()`
before the thread has been started, and unexpectedly is not blocked on the thread shutting
down.
    
    Reviewers : Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
---
 .../scala/kafka/controller/ControllerEventManager.scala     |  3 ++-
 core/src/main/scala/kafka/utils/ShutdownableThread.scala    | 13 ++++++++++---
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 54e3a9e..a456ce3 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -61,6 +61,7 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
   def start(): Unit = thread.start()
 
   def close(): Unit = {
+    thread.initiateShutdown()
     clearAndPut(KafkaController.ShutdownEventThread)
     thread.awaitShutdown()
   }
@@ -83,7 +84,7 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
 
     override def doWork(): Unit = {
       queue.take() match {
-        case KafkaController.ShutdownEventThread => initiateShutdown()
+        case KafkaController.ShutdownEventThread => // The shutting down of the thread
has been initiated at this point. Ignore this event.
         case controllerEvent =>
           _state = controllerEvent.state
 
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 13bbc90..02d09da 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -27,7 +27,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
   this.logIdent = "[" + name + "]: "
   private val shutdownInitiated = new CountDownLatch(1)
   private val shutdownComplete = new CountDownLatch(1)
-
+  @volatile private var isStarted: Boolean = false
+  
   def shutdown(): Unit = {
     initiateShutdown()
     awaitShutdown()
@@ -54,8 +55,13 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
    * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = {
-    shutdownComplete.await()
-    info("Shutdown completed")
+    if (shutdownInitiated.getCount != 0)
+      throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()")
+    else {
+      if (isStarted)
+        shutdownComplete.await()
+      info("Shutdown completed")
+    }
   }
 
   /**
@@ -76,6 +82,7 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
   def doWork(): Unit
 
   override def run(): Unit = {
+    isStarted = true
     info("Starting")
     try {
       while (isRunning)


Mime
View raw message