kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2784: swallow exceptions when mirror maker exits.
Date Thu, 03 Mar 2016 02:22:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe1fd703e -> 9cd65c46a


KAFKA-2784: swallow exceptions when mirror maker exits.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Gwen Shapira

Closes #478 from becketqin/KAFKA-2784


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cd65c46
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cd65c46
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cd65c46

Branch: refs/heads/trunk
Commit: 9cd65c46a71cb07276f23838aa55747efc32674c
Parents: fe1fd70
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Mar 2 18:22:28 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Mar 2 18:22:28 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cd65c46/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 4bc38d9..26f4826 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -425,14 +425,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         info("Flushing producer.")
         producer.flush()
         info("Committing consumer offsets.")
-        try {
-          commitOffsets(mirrorMakerConsumer)
-        } catch {
-          case e: WakeupException => // just ignore
-        }
+        CoreUtils.swallow(commitOffsets(mirrorMakerConsumer))
         info("Shutting down consumer connectors.")
-        // we do not need to call consumer.close() since the consumer has already been interrupted
-        mirrorMakerConsumer.cleanup()
+        CoreUtils.swallow(mirrorMakerConsumer.stop())
+        CoreUtils.swallow(mirrorMakerConsumer.cleanup())
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
         // if it exits accidentally, stop the entire mirror maker
@@ -572,7 +568,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     override def cleanup() {
-      ClientUtils.swallow(consumer.close())
+      consumer.close()
     }
 
     override def commit() {


Mime
View raw message