kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1432 followup - Fixing the shutdown sequence furthermore; reviewed by Neha Narkhede
Date Wed, 07 May 2014 17:08:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4fe5c4684 -> 2848a7ab5


KAFKA-1432 followup - Fixing the shutdown sequence furthermore; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2848a7ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2848a7ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2848a7ab

Branch: refs/heads/trunk
Commit: 2848a7ab5e80bce9007420517a634229e03ec6f9
Parents: 4fe5c46
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Wed May 7 10:07:40 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed May 7 10:07:54 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 43 +++++++++++---------
 1 file changed, 24 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2848a7ab/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 12fa797..26730c4 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -36,6 +36,8 @@ object MirrorMaker extends Logging {
   private var consumerThreads: Seq[ConsumerThread] = null
   private var producerThreads: ListBuffer[ProducerThread] = null
 
+  private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
+
   def main(args: Array[String]) {
     
     info ("Starting mirror maker")
@@ -159,9 +161,11 @@ object MirrorMaker extends Logging {
     consumerThreads.foreach(_.start)
     producerThreads.foreach(_.start)
 
-    // in case the consumer threads hit a timeout/other exception
-    consumerThreads.foreach(_.awaitShutdown)
-    cleanShutdown()
+    // we wait on producer's shutdown latch instead of consumers
+    // since the consumer threads can hit a timeout/other exception;
+    // but in this case the producer should still be able to shutdown
+    // based on the shutdown message in the channel
+    producerThreads.foreach(_.awaitShutdown)
   }
 
   def cleanShutdown() {
@@ -187,7 +191,7 @@ object MirrorMaker extends Logging {
     this.setName(threadName)
 
     override def run() {
-      info("Starting mirror maker thread " + threadName)
+      info("Starting mirror maker consumer thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
           // If the key of the message is empty, put it into the universal channel
@@ -208,59 +212,60 @@ object MirrorMaker extends Logging {
           fatal("Stream unexpectedly exited.", e)
       } finally {
         shutdownLatch.countDown()
-        info("Stopped thread.")
+        info("Consumer thread stopped")
       }
     }
 
     def awaitShutdown() {
       try {
         shutdownLatch.await()
+        info("Consumer thread shutdown complete")
       } catch {
-        case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This
might leak data!".format(threadName))
+        case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted.
This might leak data!")
       }
     }
   }
 
   class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord],
                         val producer: BaseProducer,
-                        val threadId: Int) extends Thread {
-    val threadName = "mirrormaker-producer-" + threadId
-    val logger = org.apache.log4j.Logger.getLogger(classOf[KafkaMigrationTool.ProducerThread].getName)
-    val shutdownComplete: CountDownLatch = new CountDownLatch(1)
-
-    private final val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
+                        val threadId: Int) extends Thread with Logging {
+    private val threadName = "mirrormaker-producer-" + threadId
+    private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
+    this.logIdent = "[%s] ".format(threadName)
 
     setName(threadName)
 
     override def run {
+      info("Starting mirror maker producer thread " + threadName)
       try {
         while (true) {
           val data: ProducerRecord = dataChannel.take
-          logger.trace("Sending message with value size %d".format(data.value().size))
+          trace("Sending message with value size %d".format(data.value().size))
 
           if(data eq shutdownMessage) {
-            logger.info("Producer thread " + threadName + " finished running")
+            info("Received shutdown message")
             return
           }
           producer.send(data.topic(), data.key(), data.value())
         }
       } catch {
         case t: Throwable => {
-          logger.fatal("Producer thread failure due to ", t)
+          fatal("Producer thread failure due to ", t)
         }
       } finally {
         shutdownComplete.countDown
+        info("Producer thread stopped")
       }
     }
 
     def shutdown {
       try {
-        logger.info("Producer thread " + threadName + " shutting down")
+        info("Producer thread " + threadName + " shutting down")
         dataChannel.put(shutdownMessage)
       }
       catch {
         case ie: InterruptedException => {
-          logger.warn("Interrupt during shutdown of ProducerThread", ie)
+          warn("Interrupt during shutdown of ProducerThread")
         }
       }
     }
@@ -269,10 +274,10 @@ object MirrorMaker extends Logging {
       try {
         shutdownComplete.await
         producer.close
-        logger.info("Producer thread " + threadName + " shutdown complete")
+        info("Producer thread shutdown complete")
       } catch {
         case ie: InterruptedException => {
-          logger.warn("Interrupt during shutdown of ProducerThread")
+          warn("Shutdown of the producer thread interrupted")
         }
       }
     }


Mime
View raw message