kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4724: Clean up of state directories can possibly remove stores that are about to be used by another thread
Date Mon, 06 Feb 2017 19:03:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d0932b028 -> 7de22453b


KAFKA-4724: Clean up of state directories can possibly remove stores that are about to be
used by another thread

Delay the cleanup of state directories that are not locked and not owned by the current thread
such that we only remove the directory if its last modified is < now - cleanupDelayMs.
This also helps to avoid a race between threads on the same instance, where during rebalance,
one thread releases the lock on the state directory, and before another thread can take the
lock, the cleanup runs and removes the data.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2486 from dguy/KAFKA-4724


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7de22453
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7de22453
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7de22453

Branch: refs/heads/trunk
Commit: 7de22453bbd05a7ac629543e09626ac6987ce306
Parents: d0932b0
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Feb 6 11:03:26 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 6 11:03:26 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  6 ++--
 .../org/apache/kafka/streams/StreamsConfig.java |  4 +--
 .../processor/internals/StateDirectory.java     | 29 ++++++++++++--------
 .../processor/internals/StreamThread.java       |  4 +--
 .../processor/internals/AbstractTaskTest.java   |  3 +-
 .../internals/GlobalStateManagerImplTest.java   | 24 ++++++++++------
 .../internals/GlobalStreamThreadTest.java       |  5 ++--
 .../internals/ProcessorStateManagerTest.java    |  3 +-
 .../processor/internals/StandbyTaskTest.java    |  3 +-
 .../processor/internals/StateDirectoryTest.java | 22 ++++++++++++---
 .../processor/internals/StreamTaskTest.java     |  2 +-
 .../processor/internals/StreamThreadTest.java   |  9 +++---
 .../StreamThreadStateStoreProviderTest.java     |  2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  3 +-
 14 files changed, 76 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c57804e..6974b29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -339,7 +339,7 @@ public class KafkaStreams {
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
                                                         clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId
+ "-global")),
-                                                        new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG)),
+                                                        new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
                                                         metrics,
                                                         time,
                                                         clientId);
@@ -559,8 +559,8 @@ public class KafkaStreams {
             localApplicationDir,
             appId);
 
-        final StateDirectory stateDirectory = new StateDirectory(appId, stateDir);
-        stateDirectory.cleanRemovedTasks();
+        final StateDirectory stateDirectory = new StateDirectory(appId, stateDir, Time.SYSTEM);
+        stateDirectory.cleanRemovedTasks(0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 57db027..4af8109 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -129,7 +129,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code state.cleanup.delay} */
     public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
-    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds
to wait before deleting state when a partition has migrated.";
+    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds
to wait before deleting state when a partition has migrated. Only state directories that have
not been modified for at least state.cleanup.delay.ms will be removed";
 
     /** {@code timestamp.extractor} */
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
@@ -291,7 +291,7 @@ public class StreamsConfig extends AbstractConfig {
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(STATE_CLEANUP_DELAY_MS_CONFIG,
                     Type.LONG,
-                    60000,
+                    10 * 60 * 1000,
                     Importance.LOW,
                     STATE_CLEANUP_DELAY_MS_DOC)
             .define(METRIC_REPORTER_CLASSES_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index d264b26..2d3e2fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -46,11 +47,13 @@ public class StateDirectory {
     private final File stateDir;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, FileLock> locks = new HashMap<>();
+    private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
 
-    public StateDirectory(final String applicationId, final String stateDirConfig) {
+    public StateDirectory(final String applicationId, final String stateDirConfig, final
Time time) {
+        this.time = time;
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(String.format("state directory [%s] doesn't
exist and couldn't be created",
@@ -69,7 +72,7 @@ public class StateDirectory {
      * @param taskId
      * @return directory for the {@link TaskId}
      */
-    public File directoryForTask(final TaskId taskId) {
+    File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
         if (!taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(String.format("task directory [%s] doesn't
exist and couldn't be created",
@@ -78,7 +81,7 @@ public class StateDirectory {
         return taskDir;
     }
 
-    public File globalStateDir() {
+    File globalStateDir() {
         final File dir = new File(stateDir, "global");
         if (!dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(String.format("global state directory [%s]
doesn't exist and couldn't be created",
@@ -94,7 +97,7 @@ public class StateDirectory {
      * @return true if successful
      * @throws IOException
      */
-    public boolean lock(final TaskId taskId, int retry) throws IOException {
+    boolean lock(final TaskId taskId, int retry) throws IOException {
         // we already have the lock so bail out here
         if (locks.containsKey(taskId)) {
             return true;
@@ -119,7 +122,7 @@ public class StateDirectory {
         return lock != null;
     }
 
-    public boolean lockGlobalState(final int retry) throws IOException {
+    boolean lockGlobalState(final int retry) throws IOException {
         if (globalStateLock != null) {
             return true;
         }
@@ -144,7 +147,7 @@ public class StateDirectory {
         return true;
     }
 
-    public void unlockGlobalState() throws IOException {
+    void unlockGlobalState() throws IOException {
         if (globalStateLock == null) {
             return;
         }
@@ -175,7 +178,7 @@ public class StateDirectory {
      * @param taskId
      * @throws IOException
      */
-    public void unlock(final TaskId taskId) throws IOException {
+    void unlock(final TaskId taskId) throws IOException {
         final FileLock lock = locks.remove(taskId);
         if (lock != null) {
             lock.release();
@@ -190,8 +193,10 @@ public class StateDirectory {
      * Remove the directories for any {@link TaskId}s that are no-longer
      * owned by this {@link StreamThread} and aren't locked by either
      * another process or another {@link StreamThread}
+     * @param cleanupDelayMs only remove directories if they haven't been modified for at
least
+     *                       this amount of time (milliseconds)
      */
-    public void cleanRemovedTasks() {
+    public void cleanRemovedTasks(final long cleanupDelayMs) {
         final File[] taskDirs = listTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
@@ -203,8 +208,10 @@ public class StateDirectory {
             if (!locks.containsKey(id)) {
                 try {
                     if (lock(id, 0)) {
-                        log.info("Deleting obsolete state directory {} for task {}", dirName,
id);
-                        Utils.delete(taskDir);
+                        if (time.milliseconds() > taskDir.lastModified() + cleanupDelayMs)
{
+                            log.info("Deleting obsolete state directory {} for task {} as
cleanup delay of {} ms has passed", dirName, id, cleanupDelayMs);
+                            Utils.delete(taskDir);
+                        }
                     }
                 } catch (OverlappingFileLockException e) {
                     // locked by another thread
@@ -226,7 +233,7 @@ public class StateDirectory {
      * List all of the task directories
      * @return The list of all the existing local directories for stream tasks
      */
-    public File[] listTaskDirectories() {
+    File[] listTaskDirectories() {
         return stateDir.listFiles(new FileFilter() {
             @Override
             public boolean accept(final File pathname) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9bc268f..fba3db5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -338,7 +338,7 @@ public class StreamThread extends Thread {
         // standby ktables
         this.standbyRecords = new HashMap<>();
 
-        this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
time);
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -750,7 +750,7 @@ public class StreamThread extends Thread {
         long now = time.milliseconds();
 
         if (now > lastCleanMs + cleanTimeMs) {
-            stateDirectory.cleanRemovedTasks();
+            stateDirectory.cleanRemovedTasks(cleanTimeMs);
             lastCleanMs = now;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 16967bc..fc0953b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -71,7 +72,7 @@ public class AbstractTaskTest {
                                 consumer,
                                 consumer,
                                 false,
-                                new StateDirectory("app", TestUtils.tempDirectory().getPath()),
+                                new StateDirectory("app", TestUtils.tempDirectory().getPath(),
new MockTime()),
                                 new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics()))) {
             @Override
             public void commit() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8d89690..168b300 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.LockException;
@@ -57,6 +58,8 @@ import static org.junit.Assert.fail;
 public class GlobalStateManagerImplTest {
 
 
+    private final MockTime time = new MockTime();
+    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
     private final TopicPartition t1 = new TopicPartition("t1", 1);
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private GlobalStateManagerImpl stateManager;
@@ -67,7 +70,6 @@ public class GlobalStateManagerImplTest {
     private NoOpReadOnlyStore store2;
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
-    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
     private ProcessorTopology topology;
 
     @Before
@@ -90,7 +92,7 @@ public class GlobalStateManagerImplTest {
 
         context = new NoOpProcessorContext();
         stateDirPath = TestUtils.tempDirectory().getPath();
-        stateDirectory = new StateDirectory("appId", stateDirPath);
+        stateDirectory = new StateDirectory("appId", stateDirPath, time);
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -109,7 +111,7 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = LockException.class)
     public void shouldThrowLockExceptionIfCantGetLock() throws Exception {
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, time);
         try {
             stateDir.lockGlobalState(1);
             stateManager.initialize(context);
@@ -304,7 +306,7 @@ public class GlobalStateManagerImplTest {
     public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
         stateManager.initialize(context);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
         try {
             // should be able to get the lock now as it should've been released in close
             assertTrue(stateDir.lockGlobalState(1));
@@ -365,7 +367,7 @@ public class GlobalStateManagerImplTest {
         } catch (StreamsException e) {
             // expected
         }
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
+        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
         try {
             // should be able to get the lock now as it should've been released
             assertTrue(stateDir.lockGlobalState(1));
@@ -374,17 +376,21 @@ public class GlobalStateManagerImplTest {
         }
     }
 
-
-    @Test(expected = LockException.class)
+    @Test
     public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws
Exception {
-        stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId",
stateDirPath) {
+        stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId",
stateDirPath, time) {
             @Override
             public boolean lockGlobalState(final int retry) throws IOException {
                 throw new IOException("KABOOM!");
             }
         });
 
-        stateManager.initialize(context);
+        try {
+            stateManager.initialize(context);
+            fail("Should have thrown LockException");
+        } catch (final LockException e) {
+            // pass
+        }
     }
 
     private void writeCorruptCheckpoint() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index e0c4882..cfceaf3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -46,6 +46,7 @@ import static org.junit.Assert.fail;
 public class GlobalStreamThreadTest {
     private final KStreamBuilder builder = new KStreamBuilder();
     private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final MockTime time = new MockTime();
     private GlobalStreamThread globalStreamThread;
     private StreamsConfig config;
 
@@ -59,7 +60,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath()),
+                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "client");
@@ -90,7 +91,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath()),
+                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "client");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ea9e870..b8d51ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -215,7 +216,7 @@ public class ProcessorStateManagerTest {
     @Before
     public void setup() {
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(applicationId, baseDir.getPath());
+        stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 2d32e78..629e521 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -139,7 +140,7 @@ public class StandbyTaskTest {
                 new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0],
new Node[0])
         ));
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(applicationId, baseDir.getPath());
+        stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index fb55796..7b8afa3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.TestUtils;
@@ -32,11 +33,13 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StateDirectoryTest {
 
+    private final MockTime time = new MockTime();
     private File stateDir;
     private String applicationId = "applicationId";
     private StateDirectory directory;
@@ -45,7 +48,7 @@ public class StateDirectoryTest {
     @Before
     public void before() {
         stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
-        directory = new StateDirectory(applicationId, stateDir.getPath());
+        directory = new StateDirectory(applicationId, stateDir.getPath(), time);
         appDir = new File(stateDir, applicationId);
     }
 
@@ -139,18 +142,29 @@ public class StateDirectoryTest {
         directory.lock(task1, 0);
         directory.directoryForTask(new TaskId(2, 0));
 
-        directory.cleanRemovedTasks();
+        directory.cleanRemovedTasks(0);
         final List<File> files = Arrays.asList(appDir.listFiles());
         assertEquals(2, files.size());
         assertTrue(files.contains(new File(appDir, task0.toString())));
         assertTrue(files.contains(new File(appDir, task1.toString())));
+    }
 
+    @Test
+    public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay()
throws Exception {
+        final File dir = directory.directoryForTask(new TaskId(2, 0));
+        final int cleanupDelayMs = 60000;
+        directory.cleanRemovedTasks(cleanupDelayMs);
+        assertTrue(dir.exists());
+
+        time.sleep(cleanupDelayMs + 1);
+        directory.cleanRemovedTasks(cleanupDelayMs);
+        assertFalse(dir.exists());
     }
 
     @Test
     public void shouldNotRemoveNonTaskDirectoriesAndFiles() throws Exception {
         final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
-        directory.cleanRemovedTasks();
+        directory.cleanRemovedTasks(0);
         assertTrue(otherDir.exists());
     }
 
@@ -170,7 +184,7 @@ public class StateDirectoryTest {
     public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception {
         final File tempDir = TestUtils.tempDirectory();
         final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
-        final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath());
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath(),
time);
         final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
         assertTrue(stateDir.exists());
         assertTrue(taskDir.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0479b9d..f2ad3a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -122,7 +122,7 @@ public class StreamTaskTest {
         source1.addChild(processor);
         source2.addChild(processor);
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory("applicationId", baseDir.getPath());
+        stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 250abc1..ef8dc92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -76,6 +76,7 @@ public class StreamThreadTest {
 
     private final String clientId = "clientId";
     private final String applicationId = "stream-thread-test";
+    private final MockTime time = new MockTime();
     private UUID processId = UUID.randomUUID();
 
     @Before
@@ -913,7 +914,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new MockStreamsMetrics(new
Metrics()),
-                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
@@ -965,7 +966,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new MockStreamsMetrics(new
Metrics()),
-                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
             @Override
             public void flushState() {
                 throw new RuntimeException("KABOOM!");
@@ -1017,7 +1018,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new MockStreamsMetrics(new
Metrics()),
-                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
             @Override
             public void closeTopology() {
                 throw new RuntimeException("KABOOM!");
@@ -1068,7 +1069,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new MockStreamsMetrics(new
Metrics()),
-                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
+                                                                 new StateDirectory(applicationId,
config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
             @Override
             public void flushState() {
                 throw new RuntimeException("KABOOM!");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ffc4485..dfe8d8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -98,7 +98,7 @@ public class StreamThreadStateStoreProviderTest {
         builder.setApplicationId(applicationId);
         final ProcessorTopology topology = builder.build(null);
         final Map<TaskId, StreamTask> tasks = new HashMap<>();
-        stateDirectory = new StateDirectory(applicationId, stateConfigDir);
+        stateDirectory = new StateDirectory(applicationId, stateConfigDir, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
         tasks.put(new TaskId(0, 0),

http://git-wip-us.apache.org/repos/asf/kafka/blob/7de22453/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 89ca0df..277f5f5 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateStore;
@@ -186,7 +187,7 @@ public class ProcessorTopologyTestDriver {
 
         consumer.assign(offsetsByTopicPartition.keySet());
 
-        final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath());
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath(),
Time.SYSTEM);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics);
 


Mime
View raw message