kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/2] kafka git commit: KAFKA-2190; Flush mirror maker before commiting offsets; abort the mirror-maker producer with close(0) on send error; reviewed by Joel Koshy
Date Thu, 21 May 2015 18:55:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bb133c63b -> 23ff851f3


KAFKA-2190; Flush mirror maker before commiting offsets; abort the
mirror-maker producer with close(0) on send error; reviewed by Joel
Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4f2391f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4f2391f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4f2391f

Branch: refs/heads/trunk
Commit: b4f2391f9e6d7b8cd29a7d79365517c19799a9e7
Parents: bb133c6
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed May 20 18:10:45 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed May 20 18:11:52 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4f2391f/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 9548521..459aaec 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import java.util
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{TimeUnit, CountDownLatch}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.{Collections, Properties}
 
@@ -222,7 +222,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         connector.setConsumerRebalanceListener(consumerRebalanceListener)
     }
 
-    // create Kafka streams
+    // create filters
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
     else
@@ -271,10 +271,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       }
       info("Closing producer.")
       producer.close()
-      connectors.foreach(commitOffsets)
-      // Connector should only be shutdown after offsets are committed.
-      info("Shutting down consumer connectors.")
-      connectors.foreach(_.shutdown())
       info("Kafka mirror maker shutdown successfully")
     }
   }
@@ -306,7 +302,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         val stream = streams(0)
         val iter = stream.iterator()
 
-        // TODO: Need to be changed after KAFKA-1660 is available.
         while (!exitingOnSendFailure && !shuttingDown) {
           try {
             while (!exitingOnSendFailure && !shuttingDown && iter.hasNext())
{
@@ -326,6 +321,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         case t: Throwable =>
           fatal("Mirror maker thread failure due to ", t)
       } finally {
+        info("Flushing producer.")
+        producer.flush()
+        info("Committing consumer offsets.")
+        commitOffsets(connector)
+        info("Shutting down consumer connectors.")
+        connector.shutdown()
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker
@@ -388,6 +389,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     def close() {
       this.producer.close()
     }
+
+    def close(timeout: Long) {
+      this.producer.close(timeout, TimeUnit.MILLISECONDS)
+    }
   }
 
   private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte])
@@ -399,8 +404,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // still could not be sent.
         super.onCompletion(metadata, exception)
         // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped
message and move on.
-        if (abortOnSendFailure)
+        if (abortOnSendFailure) {
+          info("Closing producer due to send failure.")
           exitingOnSendFailure = true
+          producer.close(0)
+        }
         numDroppedMessages.incrementAndGet()
       }
     }


Mime
View raw message