kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [09/36] git commit: KAFKA-983; Expose MirrorMaker cleanShutdown method. Patched by Swapnil Ghike.
Date Wed, 11 Sep 2013 17:03:54 GMT
KAFKA-983; Expose MirrorMaker cleanShutdown method. Patched by Swapnil Ghike.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc5620cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc5620cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc5620cb

Branch: refs/heads/trunk
Commit: bc5620cbd270cd8ab443f567fee5203c66db253f
Parents: 5cf6a54
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Wed Jul 24 18:03:52 2013 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Jul 24 18:05:26 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 29 ++++++++++++--------
 1 file changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc5620cb/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index a85bfa2..c747bfb 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -31,6 +31,10 @@ import kafka.javaapi
 
 object MirrorMaker extends Logging {
 
+  private var connectors: Seq[ZookeeperConsumerConnector] = null
+  private var consumerThreads: Seq[MirrorMakerThread] = null
+  private var producerThreads: ListBuffer[ProducerThread] = null
+
   def main(args: Array[String]) {
     
     info ("Starting mirror maker")
@@ -112,7 +116,7 @@ object MirrorMaker extends Logging {
       new Producer[Array[Byte], Array[Byte]](config)
     })
 
-    val connectors = options.valuesOf(consumerConfigOpt).toList
+    connectors = options.valuesOf(consumerConfigOpt).toList
             .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString)))
             .map(new ZookeeperConsumerConnector(_))
 
@@ -132,18 +136,9 @@ object MirrorMaker extends Logging {
 
     val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
-    val consumerThreads =
-      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1,
producerDataChannel, producers, streamAndIndex._2))
-
-    val producerThreads = new ListBuffer[ProducerThread]()
+    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1,
producerDataChannel, producers, streamAndIndex._2))
 
-    def cleanShutdown() {
-      connectors.foreach(_.shutdown)
-      consumerThreads.foreach(_.awaitShutdown)
-      producerThreads.foreach(_.shutdown)
-      producerThreads.foreach(_.awaitShutdown)
-      info("Kafka mirror maker shutdown successfully")
-    }
+    producerThreads = new ListBuffer[ProducerThread]()
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -168,6 +163,16 @@ object MirrorMaker extends Logging {
     cleanShutdown()
   }
 
+  def cleanShutdown() {
+    if (connectors != null) connectors.foreach(_.shutdown)
+    if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
+    if (producerThreads != null) {
+      producerThreads.foreach(_.shutdown)
+      producerThreads.foreach(_.awaitShutdown)
+    }
+    info("Kafka mirror maker shutdown successfully")
+  }
+
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
                           producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte],
Array[Byte]]],
                           producers: Seq[Producer[Array[Byte], Array[Byte]]],


Mime
View raw message