kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5038; Throw correct exception of locking of state directory fails
Date Wed, 12 Apr 2017 15:34:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0be835dde -> 1e93c3b9a


KAFKA-5038; Throw correct exception of locking of state directory fails

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #2848 from enothereska/KAFKA-5038-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e93c3b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e93c3b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e93c3b9

Branch: refs/heads/trunk
Commit: 1e93c3b9a3b8cbf069c57b46737fc02c818cb286
Parents: 0be835d
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Wed Apr 12 16:33:58 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Apr 12 16:34:07 2017 +0100

----------------------------------------------------------------------
 .../processor/internals/ProcessorStateManager.java | 17 +++++++++++++----
 .../processor/internals/StateDirectory.java        |  9 ++++++++-
 .../processor/internals/StateDirectoryTest.java    | 16 ++++++++++++++++
 3 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1e93c3b9/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 4b7bb1f..e555444 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -78,7 +78,8 @@ public class ProcessorStateManager implements StateManager {
         this.taskId = taskId;
         this.stateDirectory = stateDirectory;
         this.changelogReader = changelogReader;
-        this.baseDir  = stateDirectory.directoryForTask(taskId);
+        this.logPrefix = String.format("task [%s]", taskId);
+
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
             this.partitionForTopic.put(source.topic(), source);
@@ -91,10 +92,18 @@ public class ProcessorStateManager implements StateManager {
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>()
: null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
-        this.logPrefix = String.format("task [%s]", taskId);
-
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new LockException(String.format("%s Failed to lock the state directory:
%s", logPrefix, baseDir.getCanonicalPath()));
+            throw new LockException(String.format("%s Failed to lock the state directory
for task %s",
+                logPrefix, taskId));
+        }
+        // get a handle on the parent/base directory of the task directory
+        // note that the parent directory could have been accidentally deleted here,
+        // so catch that exception if that is the case
+        try {
+            this.baseDir = stateDirectory.directoryForTask(taskId);
+        } catch (ProcessorStateException e) {
+            throw new LockException(String.format("%s Failed to get the directory for task
%s. Exception %s",
+                logPrefix, taskId, e));
         }
 
         // load the checkpoint information

http://git-wip-us.apache.org/repos/asf/kafka/blob/1e93c3b9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 85908e7..78e3b9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -103,12 +103,19 @@ public class StateDirectory {
      * @throws IOException
      */
     boolean lock(final TaskId taskId, int retry) throws IOException {
+        final File lockFile;
         // we already have the lock so bail out here
         if (locks.containsKey(taskId)) {
             log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
             return true;
         }
-        final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+        try {
+            lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+        } catch (ProcessorStateException e) {
+            // directoryForTask could be throwing an exception if another thread
+            // has concurrently deleted the directory
+            return false;
+        }
 
         final FileChannel channel;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1e93c3b9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 770ff25..f1b5efe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -106,8 +107,23 @@ public class StateDirectoryTest {
         }
     }
 
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateException() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        Utils.delete(stateDir);
+        directory.directoryForTask(taskId);
+    }
 
     @Test
+    public void shouldNotLockDeletedDirectory() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+
+        Utils.delete(stateDir);
+        assertFalse(directory.lock(taskId, 0));
+    }
+    
+    @Test
     public void shouldLockMulitpleTaskDirectories() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final File task1Dir = directory.directoryForTask(taskId);


Mime
View raw message