kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang Wang
Date Wed, 25 Mar 2015 21:01:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a74688de4 -> 5b42b538e


KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang
Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b42b538
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b42b538
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b42b538

Branch: refs/heads/trunk
Commit: 5b42b538eb46203f7fd308cb3d3f27dde98840b8
Parents: a74688d
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Mar 25 14:01:19 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 25 14:01:19 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 49 ++++++++------------
 1 file changed, 19 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b42b538/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 4f3c4c8..ec07743 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.{Collections, Properties}
 
-import scala.collection.JavaConversions._
-
 import com.yammer.metrics.core.Gauge
 import joptsimple.OptionParser
-import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException,
KafkaStream, Whitelist, ZookeeperConsumerConnector}
+import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException,
TopicFilter, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, Logging, Utils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord,
RecordMetadata}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
+
+import scala.collection.JavaConversions._
 
 /**
  * The mirror maker has the following architecture:
@@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     else
       new Blacklist(options.valueOf(blacklistOpt))
 
-    // create a (connector->stream) sequence
-    val connectorStream = (0 until numStreams) map {
-      i => {
-        var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
-        try {
-          // Creating just on stream per each connector instance
-          stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(),
new DefaultDecoder())
-          require(stream.size == 1)
-        } catch {
-          case t: Throwable =>
-            fatal("Unable to create stream - shutting down mirror maker.", t)
-            connectors(i).shutdown()
-        }
-        connectors(i) -> stream(0)
-      }
-    }
-
     // Create mirror maker threads
     mirrorMakerThreads = (0 until numStreams) map ( i =>
-        new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i)
+        new MirrorMakerThread(connectors(i), filterSpec, i)
     )
 
     // Create and initialize message handler
@@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue:
String) {
-    properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue))
+    val propertyValue = properties.getProperty(propertyName)
+    properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue))
     if (properties.getProperty(propertyName) != defaultValue)
-      info("Property %s is overridden to %s - data loss or message reordering is possible.")
+      info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName,
propertyValue))
   }
 
   class MirrorMakerThread(connector: ZookeeperConsumerConnector,
-                          stream: KafkaStream[Array[Byte], Array[Byte]],
+                          filterSpec: TopicFilter,
                           val threadId: Int) extends Thread with Logging with KafkaMetricsGroup
{
     private val threadName = "mirrormaker-thread-" + threadId
     private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
@@ -313,8 +297,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def run() {
       info("Starting mirror maker thread " + threadName)
-      val iter = stream.iterator()
       try {
+        // Creating one stream per each connector instance
+        val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(),
new DefaultDecoder())
+        require(streams.size == 1)
+        val stream = streams(0)
+        val iter = stream.iterator()
+
         // TODO: Need to be changed after KAFKA-1660 is available.
         while (!exitingOnSendFailure && !shuttingDown) {
           try {
@@ -333,10 +322,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         }
       } catch {
         case t: Throwable =>
-          fatal("Producer thread failure due to ", t)
+          fatal("Mirror maker thread failure due to ", t)
       } finally {
         shutdownLatch.countDown()
-        info("Producer thread stopped")
+        info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker
         if (!isShuttingdown.get()) {
           fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
@@ -360,7 +349,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       }
       catch {
         case ie: InterruptedException =>
-          warn("Interrupt during shutdown of ProducerThread")
+          warn("Interrupt during shutdown of the mirror maker thread")
       }
     }
 
@@ -370,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         info("Mirror maker thread shutdown complete")
       } catch {
         case ie: InterruptedException =>
-          warn("Shutdown of the producer thread interrupted")
+          warn("Shutdown of the mirror maker thread interrupted")
       }
     }
   }


Mime
View raw message