kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5679; Add logging for broker termination due to SIGTERM or SIGINT
Date Tue, 26 Sep 2017 14:10:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b8be86b80 -> 8256f882c


KAFKA-5679; Add logging for broker termination due to SIGTERM or SIGINT

This depends on sun.misc.Signal and sun.misc.SignalHandler, which may be
removed in future releases. But along with sun.misc.Unsafe, these classes
are available in Java 9 (see JEP 260), so they are safe to use for now.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3668 from rajinisivaram/KAFKA-5679


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8256f882
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8256f882
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8256f882

Branch: refs/heads/trunk
Commit: 8256f882c92daa1470382502ab94cbe2c16028f1
Parents: b8be86b
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Tue Sep 26 14:47:00 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Sep 26 15:02:38 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/Kafka.scala      | 29 ++++++++++++++++++++++---
 core/src/main/scala/kafka/utils/Exit.scala |  2 +-
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8256f882/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 7c5b420..062ff30 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -18,6 +18,7 @@
 package kafka
 
 import java.util.Properties
+import sun.misc.{Signal, SignalHandler}
 
 import joptsimple.OptionParser
 import kafka.utils.Implicits._
@@ -25,6 +26,7 @@ import kafka.server.{KafkaServer, KafkaServerStartable}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 object Kafka extends Logging {
@@ -41,10 +43,10 @@ object Kafka extends Logging {
 
     val props = Utils.loadProps(args(0))
 
-    if(args.length > 1) {
+    if (args.length > 1) {
       val options = optionParser.parse(args.slice(1, args.length): _*)
 
-      if(options.nonOptionArguments().size() > 0) {
+      if (options.nonOptionArguments().size() > 0) {
         CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: "
+ options.nonOptionArguments().toArray.mkString(","))
       }
 
@@ -53,12 +55,33 @@ object Kafka extends Logging {
     props
   }
 
+  private def registerLoggingSignalHandler(): Unit = {
+    val jvmSignalHandlers = mutable.Map[String, SignalHandler]()
+    val handler = new SignalHandler() {
+      override def handle(signal: Signal) {
+        info(s"Terminating process due to signal $signal")
+        jvmSignalHandlers.get(signal.getName).foreach(_.handle(signal))
+      }
+    }
+    def registerHandler(signalName: String) {
+      val oldHandler = Signal.handle(new Signal(signalName), handler)
+      if (oldHandler != null)
+        jvmSignalHandlers.put(signalName, oldHandler)
+    }
+    registerHandler("TERM")
+    registerHandler("INT")
+    registerHandler("HUP")
+  }
+
   def main(args: Array[String]): Unit = {
     try {
       val serverProps = getPropsFromArgs(args)
       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
 
-      // attach shutdown handler to catch control-c
+      // register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c)
+      registerLoggingSignalHandler()
+
+      // attach shutdown handler to catch terminating signals as well as normal termination
       Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
         override def run(): Unit = kafkaServerStartable.shutdown()
       })

http://git-wip-us.apache.org/repos/asf/kafka/blob/8256f882/core/src/main/scala/kafka/utils/Exit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala
index a4120b7..3e29ddd 100644
--- a/core/src/main/scala/kafka/utils/Exit.scala
+++ b/core/src/main/scala/kafka/utils/Exit.scala
@@ -19,7 +19,7 @@ package kafka.utils
 import org.apache.kafka.common.utils.{Exit => JExit}
 
 /**
-  * Internal class that should be used instead of `Exit.exit()` and `Runtime.getRuntime().halt()`
so that tests can
+  * Internal class that should be used instead of `System.exit()` and `Runtime.getRuntime().halt()`
so that tests can
   * easily change the behaviour.
   */
 object Exit {


Mime
View raw message