kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: HOTFIX: poll even when all partitions are paused. handle concurrent cleanup
Date Wed, 10 Feb 2016 22:02:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1faab034b -> c1f8f689a


HOTFIX: poll even when all partitions are paused. handle concurrent cleanup

* We need to poll periodically even when all partitions are paused in order to respond to
a possible rebalance promptly.
* There is a race condition when two (or more) threads try to clean up the same state directory.
One of the thread fails with FileNotFoundException. Thus the new code simply catches it and
ignore.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Gwen Shapira

Closes #893 from ymatsuda/hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1f8f689
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1f8f689
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1f8f689

Branch: refs/heads/trunk
Commit: c1f8f689af43f5ce5a95dad86537db4615449694
Parents: 1faab03
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Feb 10 15:02:27 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Feb 10 15:02:27 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 43 +++++++++++++-------
 1 file changed, 28 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1f8f689/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d51974a..18dc0ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -51,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.FileLock;
 import java.util.ArrayList;
@@ -300,6 +301,7 @@ public class StreamThread extends Thread {
 
     private void runLoop() {
         int totalNumBuffered = 0;
+        long lastPoll = 0L;
         boolean requiresPoll = true;
 
         ensureCopartitioning(builder.copartitionGroups());
@@ -314,6 +316,7 @@ public class StreamThread extends Thread {
                 long startPoll = time.milliseconds();
 
                 ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered
== 0 ? this.pollTimeMs : 0);
+                lastPoll = time.milliseconds();
 
                 if (!records.isEmpty()) {
                     for (TopicPartition partition : records.partitions()) {
@@ -340,6 +343,12 @@ public class StreamThread extends Thread {
                 }
 
                 maybePunctuate();
+
+                // if pollTimeMs has passed since the last poll, we poll to respond to a
possible rebalance
+                // even when we paused all partitions.
+                if (lastPoll + this.pollTimeMs < time.milliseconds())
+                    requiresPoll = true;
+
             } else {
                 // even when no task is assigned, we must poll to get a task.
                 requiresPoll = true;
@@ -489,21 +498,25 @@ public class StreamThread extends Thread {
                         TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-")
+ 1));
 
                         // try to acquire the exclusive lock on the state directory
-                        FileLock directoryLock = null;
-                        try {
-                            directoryLock = ProcessorStateManager.lockStateDirectory(dir);
-                            if (directoryLock != null) {
-                                log.info("Deleting obsolete state directory {} for task {}
after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
-                                Utils.delete(dir);
-                            }
-                        } catch (IOException e) {
-                            log.error("Failed to lock the state directory due to an unexpected
exception", e);
-                        } finally {
-                            if (directoryLock != null) {
-                                try {
-                                    directoryLock.release();
-                                } catch (IOException e) {
-                                    log.error("Failed to release the state directory lock");
+                        if (dir.exists()) {
+                            FileLock directoryLock = null;
+                            try {
+                                directoryLock = ProcessorStateManager.lockStateDirectory(dir);
+                                if (directoryLock != null) {
+                                    log.info("Deleting obsolete state directory {} for task
{} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
+                                    Utils.delete(dir);
+                                }
+                            } catch (FileNotFoundException e) {
+                                // the state directory may be deleted by another thread
+                            } catch (IOException e) {
+                                log.error("Failed to lock the state directory due to an unexpected
exception", e);
+                            } finally {
+                                if (directoryLock != null) {
+                                    try {
+                                        directoryLock.release();
+                                    } catch (IOException e) {
+                                        log.error("Failed to release the state directory
lock");
+                                    }
                                 }
                             }
                         }


Mime
View raw message