kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of InterruptException.
Date Fri, 20 Nov 2015 18:04:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d0614f97b -> 7b3d1bf6a


KAFKA-2867: Fix missing WorkerSourceTask synchronization and handling of InterruptException.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #566 from ewencp/kafka-2867-fix-source-sync-and-interrupt


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b3d1bf6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b3d1bf6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b3d1bf6

Branch: refs/heads/trunk
Commit: 7b3d1bf6a398f5b1f454a7719d04f5ee9e630f96
Parents: d0614f9
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri Nov 20 10:04:40 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 20 10:04:40 2015 -0800

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java       | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b3d1bf6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 7178542..6c61d79 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -203,7 +203,7 @@ class WorkerSourceTask implements WorkerTask {
             removed = outstandingMessagesBacklog.remove(record);
         // But if neither one had it, something is very wrong
         if (removed == null) {
-            log.error("Saw callback for record that was not present in the outstanding message
set: "
+            log.error("CRITICAL Saw callback for record that was not present in the outstanding
message set: "
                     + "{}", record);
         } else if (flushing && outstandingMessages.isEmpty()) {
             // flush thread may be waiting on the outstanding messages to clear
@@ -231,19 +231,25 @@ class WorkerSourceTask implements WorkerTask {
             // to persistent storage
 
             // Next we need to wait for all outstanding messages to finish sending
+            log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
             while (!outstandingMessages.isEmpty()) {
                 try {
                     long timeoutMs = timeout - time.milliseconds();
                     if (timeoutMs <= 0) {
                         log.error(
                                 "Failed to flush {}, timed out while waiting for producer
to flush outstanding "
-                                        + "messages", this.toString());
+                                        + "messages, {} left ({})", this, outstandingMessages.size(),
outstandingMessages);
                         finishFailedFlush();
                         return false;
                     }
                     this.wait(timeoutMs);
                 } catch (InterruptedException e) {
-                    // ignore
+                    // We can get interrupted if we take too long committing when the work
thread shutdown is requested,
+                    // requiring a forcible shutdown. Give up since we can't safely commit
any offsets, but also need
+                    // to stop immediately
+                    log.error("{} Interrupted while flushing messages, offsets will not be
committed", this);
+                    finishFailedFlush();
+                    return false;
                 }
             }
 
@@ -309,7 +315,7 @@ class WorkerSourceTask implements WorkerTask {
         flushing = false;
     }
 
-    private void finishSuccessfulFlush() {
+    private synchronized void finishSuccessfulFlush() {
         // If we were successful, we can just swap instead of replacing items back into the
original map
         IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[],
byte[]>> temp = outstandingMessages;
         outstandingMessages = outstandingMessagesBacklog;


Mime
View raw message