kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao
Date Wed, 03 Apr 2013 20:45:27 GMT
Updated Branches:
  refs/heads/0.8 3c27988ca -> bd262ac70


KAFKA-842 Mirror maker can lose some messages during shutdown; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd262ac7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd262ac7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd262ac7

Branch: refs/heads/0.8
Commit: bd262ac708062e502406e8d775f4c9432a5364e7
Parents: 3c27988
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Wed Apr 3 13:43:50 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Apr 3 13:43:50 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    5 ++++-
 1 files changed, 4 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd262ac7/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index a15b350..3c18286 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -385,8 +385,10 @@ public class KafkaMigrationTool {
       try{
         while(true) {
           KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
-          if(!data.equals(shutdownMessage))
+          if(!data.equals(shutdownMessage)) {
             producer.send(data);
+            if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
+          }
           else
             break;
         }
@@ -410,6 +412,7 @@ public class KafkaMigrationTool {
     public void awaitShutdown() {
       try {
         shutdownComplete.await();
+        producer.close();
         logger.info("Producer thread " + threadName + " shutdown complete");
       } catch(InterruptedException ie) {
         logger.warn("Interrupt during shutdown of ProducerThread", ie);


Mime
View raw message