kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2454; Deadlock between log segment deletion and server shutdown.
Date Wed, 21 Oct 2015 20:24:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 86eb74d92 -> 300565381


KAFKA-2454; Deadlock between log segment deletion and server shutdown.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>

Closes #153 from becketqin/KAFKA-2454


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30056538
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30056538
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30056538

Branch: refs/heads/trunk
Commit: 30056538130ae1e2be35398b0ddd2ea04105bafd
Parents: 86eb74d
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Oct 21 13:24:10 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Oct 21 13:24:10 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/utils/KafkaScheduler.scala | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/30056538/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 5bab08d..641218e 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -70,7 +70,7 @@ class KafkaScheduler(val threads: Int,
                      daemon: Boolean = true) extends Scheduler with Logging {
   private var executor: ScheduledThreadPoolExecutor = null
   private val schedulerThreadId = new AtomicInteger(0)
-  
+
   override def startup() {
     debug("Initializing task scheduler.")
     this synchronized {
@@ -88,12 +88,14 @@ class KafkaScheduler(val threads: Int,
   
   override def shutdown() {
     debug("Shutting down task scheduler.")
-    this synchronized {
-      if(isStarted) {
-        executor.shutdown()
-        executor.awaitTermination(1, TimeUnit.DAYS)
+    // We use the local variable to avoid NullPointerException if another thread shuts down
scheduler at same time.
+    val cachedExecutor = this.executor
+    if (cachedExecutor != null) {
+      this synchronized {
+        cachedExecutor.shutdown()
         this.executor = null
       }
+      cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
     }
   }
 
@@ -101,10 +103,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period,
unit)))
     this synchronized {
-      ensureStarted
+      ensureRunning
       val runnable = CoreUtils.runnable {
         try {
-          trace("Begining execution of scheduled task '%s'.".format(name))
+          trace("Beginning execution of scheduled task '%s'.".format(name))
           fun()
         } catch {
           case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'",
t)
@@ -125,8 +127,8 @@ class KafkaScheduler(val threads: Int,
     }
   }
   
-  private def ensureStarted = {
+  private def ensureRunning = {
     if(!isStarted)
-      throw new IllegalStateException("Kafka scheduler has not been started")
+      throw new IllegalStateException("Kafka scheduler is not running.")
   }
 }


Mime
View raw message