kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist
Date Fri, 30 Sep 2016 18:55:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 18a081261 -> 5f3746d13


KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist

Change the creation of the directories, in the StateDirectory constructor, to use mkdirs so
any parents get created. Throw an exception if the directory doesn't exist and couldn't be
created

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll, Eno Thereska, Guozhang Wang

Closes #1942 from dguy/kafka-4233


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f3746d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f3746d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f3746d1

Branch: refs/heads/trunk
Commit: 5f3746d135697f364aaacf877ce288267d00b9a2
Parents: 18a0812
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 30 11:55:32 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 30 11:55:32 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StateDirectory.java | 16 ++++++++++------
 .../processor/internals/StateDirectoryTest.java     | 10 ++++++++++
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3746d1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 02abdeb..3048fba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,12 +48,14 @@ public class StateDirectory {
 
     public StateDirectory(final String applicationId, final String stateDirConfig) {
         final File baseDir = new File(stateDirConfig);
-        if (!baseDir.exists()) {
-            baseDir.mkdir();
+        if (!baseDir.exists() && !baseDir.mkdirs()) {
+            throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
+                                                            stateDirConfig));
         }
         stateDir = new File(baseDir, applicationId);
-        if (!stateDir.exists()) {
-            stateDir.mkdir();
+        if (!stateDir.exists() && !stateDir.mkdir()) {
+            throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
+                                                            stateDir.getPath()));
         }
 
     }
@@ -64,8 +67,9 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (!taskDir.exists()) {
-            taskDir.mkdir();
+        if (!taskDir.exists() && !taskDir.mkdir()) {
+            throw new ProcessorStateException(String.format("task directory [%s] doesn't
exist and couldn't be created",
+                                                            taskDir.getPath()));
         }
         return taskDir;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f3746d1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index c17e7bc..6fc855c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -166,4 +166,14 @@ public class StateDirectoryTest {
         assertTrue(dirs.contains(taskDir2));
     }
 
+    @Test
+    public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception {
+        final File tempDir = TestUtils.tempDirectory();
+        final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath());
+        final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
+        assertTrue(stateDir.exists());
+        assertTrue(taskDir.exists());
+    }
+
 }
\ No newline at end of file


Mime
View raw message