kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3812: fix state store directory locking in Kafka Streams
Date Tue, 19 Jul 2016 23:00:52 GMT
KAFKA-3812: fix state store directory locking in Kafka Streams

Move all state directory creation/locking/unlocking/cleaning to a single class. Don't release the channel until the lock is released. Refactor code to make use of new class

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang

Closes #1628 from dguy/kafka-3812


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14934157
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14934157
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14934157

Branch: refs/heads/trunk
Commit: 14934157df7aaf5e9c37a302ef9fd9317b95efa4
Parents: cfebfdf
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 19 16:00:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 19 16:00:48 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  11 +-
 .../internals/ProcessorStateManager.java        |  69 +--
 .../processor/internals/StandbyTask.java        |   6 +-
 .../processor/internals/StateDirectory.java     | 184 ++++++++
 .../streams/processor/internals/StreamTask.java |   7 +-
 .../processor/internals/StreamThread.java       |  67 +--
 .../internals/ProcessorStateManagerTest.java    | 454 +++++++++----------
 .../processor/internals/StandbyTaskTest.java    | 258 +++++------
 .../processor/internals/StateDirectoryTest.java | 169 +++++++
 .../processor/internals/StreamTaskTest.java     | 268 ++++++-----
 .../processor/internals/StreamThreadTest.java   |  11 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   3 +-
 12 files changed, 850 insertions(+), 657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index fb22c8a..488fc0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,14 +20,12 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -53,8 +51,8 @@ public abstract class AbstractTask {
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
                            Consumer<byte[], byte[]> restoreConsumer,
-                           StreamsConfig config,
-                           boolean isStandby) {
+                           boolean isStandby,
+                           StateDirectory stateDirectory) {
         this.id = id;
         this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
@@ -63,10 +61,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            File applicationStateDir = StreamThread.makeStateDir(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
-            File stateFile = new File(applicationStateDir.getCanonicalPath(), id.toString());
-            // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(applicationId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
+            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory);
         } catch (IOException e) {
             throw new ProcessorStateException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 92b1069..883959e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,16 +24,13 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,13 +48,11 @@ public class ProcessorStateManager {
 
     public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
-    public static final String LOCK_FILE_NAME = ".lock";
 
     private final String applicationId;
     private final int defaultPartition;
     private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
-    private final FileLock directoryLock;
     private final Map<String, StateStore> stores;
     private final Set<String> loggingEnabled;
     private final Consumer<byte[], byte[]> restoreConsumer;
@@ -66,18 +61,21 @@ public class ProcessorStateManager {
     private final Map<TopicPartition, Long> offsetLimits;
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
+    private final TaskId taskId;
+    private final StateDirectory stateDirectory;
 
     /**
      * @throws IOException if any error happens while creating or locking the state directory
      */
-    public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+    public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, StateDirectory stateDirectory) throws IOException {
         this.applicationId = applicationId;
-        this.defaultPartition = defaultPartition;
+        this.defaultPartition = taskId.partition;
+        this.taskId = taskId;
+        this.stateDirectory = stateDirectory;
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
             this.partitionForTopic.put(source.topic(), source);
         }
-        this.baseDir = baseDir;
         this.stores = new HashMap<>();
         this.loggingEnabled = new HashSet<>();
         this.restoreConsumer = restoreConsumer;
@@ -85,13 +83,9 @@ public class ProcessorStateManager {
         this.isStandby = isStandby;
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
         this.offsetLimits = new HashMap<>();
+        this.baseDir  = stateDirectory.directoryForTask(taskId);
 
-        // create the state directory for this task if missing (we won't create the parent directory)
-        createStateDirectory(baseDir);
-
-        // try to acquire the exclusive lock on the state directory
-        directoryLock = lockStateDirectory(baseDir, 5);
-        if (directoryLock == null) {
+        if (!stateDirectory.lock(taskId, 5)) {
             throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
         }
 
@@ -103,53 +97,11 @@ public class ProcessorStateManager {
         checkpoint.delete();
     }
 
-    private static void createStateDirectory(File stateDir) throws IOException {
-        if (!stateDir.exists()) {
-            stateDir.mkdir();
-        }
-    }
 
     public static String storeChangelogTopic(String applicationId, String storeName) {
         return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
     }
 
-    /**
-     * @throws IOException if any error happens when locking the state directory
-     */
-    public static FileLock lockStateDirectory(File stateDir) throws IOException {
-        return lockStateDirectory(stateDir, 0);
-    }
-
-    private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
-        File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
-        FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
-
-        FileLock lock = lockStateDirectory(channel);
-        while (lock == null && retry > 0) {
-            try {
-                Thread.sleep(200);
-            } catch (Exception ex) {
-                // do nothing
-            }
-            retry--;
-            lock = lockStateDirectory(channel);
-        }
-        // TODO: closing the channel here risks releasing all locks on the file
-        // see {@link https://issues.apache.org/jira/browse/KAFKA-3812}
-        if (lock == null) {
-            channel.close();
-        }
-        return lock;
-    }
-
-    private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
-        try {
-            return channel.tryLock();
-        } catch (OverlappingFileLockException e) {
-            return null;
-        }
-    }
-
     public File baseDir() {
         return this.baseDir;
     }
@@ -378,8 +330,7 @@ public class ProcessorStateManager {
             }
         } finally {
             // release the state directory directoryLock
-            directoryLock.release();
-            directoryLock.channel().close();
+            stateDirectory.unlock(taskId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index f19d5a3..830fab6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -38,7 +38,6 @@ public class StandbyTask extends AbstractTask {
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
-     *
      * @param id                    the ID of this task
      * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
@@ -47,6 +46,7 @@ public class StandbyTask extends AbstractTask {
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
+     * @param stateDirectory        the {@link StateDirectory} created by the thread
      */
     public StandbyTask(TaskId id,
                        String applicationId,
@@ -55,8 +55,8 @@ public class StandbyTask extends AbstractTask {
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
                        StreamsConfig config,
-                       StreamsMetrics metrics) {
-        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, true);
+                       StreamsMetrics metrics, final StateDirectory stateDirectory) {
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory);
 
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
new file mode 100644
index 0000000..d81a91c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+
+/**
+ * Manages the directories where the state of Tasks owned by a {@link StreamThread} are
+ * stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not
+ * thread-safe.
+ */
+public class StateDirectory {
+
+    static final String LOCK_FILE_NAME = ".lock";
+    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
+
+    private final File stateDir;
+    private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
+    private final HashMap<TaskId, FileLock> locks = new HashMap<>();
+
+    public StateDirectory(final String applicationId, final String stateDirConfig) {
+        final File baseDir = new File(stateDirConfig);
+        if (!baseDir.exists()) {
+            baseDir.mkdir();
+        }
+        stateDir = new File(baseDir, applicationId);
+        if (!stateDir.exists()) {
+            stateDir.mkdir();
+        }
+
+    }
+
+    /**
+     * Get or create the directory for the {@link TaskId}
+     * @param taskId
+     * @return directory for the {@link TaskId}
+     */
+    public File directoryForTask(final TaskId taskId) {
+        final File taskDir = new File(stateDir, taskId.toString());
+        if (!taskDir.exists()) {
+            taskDir.mkdir();
+        }
+        return taskDir;
+    }
+
+    /**
+     * Get the lock for the {@link TaskId}s directory if it is available
+     * @param taskId
+     * @param retry
+     * @return true if successful
+     * @throws IOException
+     */
+    public boolean lock(final TaskId taskId, int retry) throws IOException {
+        // we already have the lock so bail out here
+        if (locks.containsKey(taskId)) {
+            return true;
+        }
+        final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
+        final FileChannel channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+
+        FileLock lock = tryAcquireLock(channel);
+        while (lock == null && retry > 0) {
+            try {
+                Thread.sleep(200);
+            } catch (Exception ex) {
+                // do nothing
+            }
+            retry--;
+            lock = tryAcquireLock(channel);
+        }
+        if (lock != null) {
+            locks.put(taskId, lock);
+        }
+        return lock != null;
+    }
+
+    /**
+     * Unlock the state directory for the given {@link TaskId}
+     * @param taskId
+     * @throws IOException
+     */
+    public void unlock(final TaskId taskId) throws IOException {
+        final FileLock lock = locks.remove(taskId);
+        if (lock != null) {
+            lock.release();
+            final FileChannel fileChannel = channels.remove(taskId);
+            if (fileChannel != null) {
+                fileChannel.close();
+            }
+        }
+    }
+
+    /**
+     * Remove the directories for any {@link TaskId}s that are no-longer
+     * owned by this {@link StreamThread} and aren't locked by either
+     * another process or another {@link StreamThread}
+     */
+    public void cleanRemovedTasks() {
+        final File[] taskDirs = listTaskDirectories();
+        if (taskDirs == null || taskDirs.length == 0) {
+            return; // nothing to do
+        }
+
+        for (File taskDir : taskDirs) {
+            final String dirName = taskDir.getName();
+            TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id, 0)) {
+                        log.info("Deleting obsolete state directory {} for task {}", dirName, id);
+                        Utils.delete(taskDir);
+                    }
+                } catch (OverlappingFileLockException e) {
+                    // locked by another thread
+                } catch (IOException e) {
+                    log.error("Failed to lock the state directory due to an unexpected exception", e);
+                } finally {
+                    try {
+                        unlock(id);
+                    } catch (IOException e) {
+                        log.error("Failed to release the state directory lock");
+                    }
+                }
+            }
+        }
+
+    }
+
+    /**
+     * List all of the task directories
+     * @return
+     */
+    public File[] listTaskDirectories() {
+        return stateDir.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(final File pathname) {
+                final String name = pathname.getName();
+                return pathname.isDirectory() && name.matches("\\d+_\\d+");
+            }
+        });
+    }
+
+    private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException {
+        if (!channels.containsKey(taskId)) {
+            channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
+        }
+        return channels.get(taskId);
+    }
+
+    private FileLock tryAcquireLock(final FileChannel channel) throws IOException {
+        try {
+            return channel.tryLock();
+        } catch (OverlappingFileLockException e) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e7e24fb..402c8fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -63,7 +63,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     /**
      * Create {@link StreamTask} with its assigned partitions
-     *
      * @param id                    the ID of this task
      * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
@@ -73,6 +72,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
+     * @param stateDirectory        the {@link StateDirectory} created by the thread
      */
     public StreamTask(TaskId id,
                       String applicationId,
@@ -82,8 +82,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Producer<byte[], byte[]> producer,
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamsConfig config,
-                      StreamsMetrics metrics) {
-        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false);
+                      StreamsMetrics metrics,
+                      StateDirectory stateDirectory) {
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c18e1cf..f982efa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,7 +34,6 @@ import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -47,9 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.channels.FileLock;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -92,11 +88,11 @@ public class StreamThread extends Thread {
     private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
     private final Set<TaskId> prevTasks;
     private final Time time;
-    private final File stateDir;
     private final long pollTimeMs;
     private final long cleanTimeMs;
     private final long commitTimeMs;
     private final StreamsMetricsImpl sensors;
+    final StateDirectory stateDirectory;
 
     private StreamPartitionAssignor partitionAssignor = null;
 
@@ -107,18 +103,6 @@ public class StreamThread extends Thread {
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
 
-    static File makeStateDir(String applicationId, String baseDirName) {
-        File baseDir = new File(baseDirName);
-        if (!baseDir.exists())
-            baseDir.mkdir();
-
-        File stateDir = new File(baseDir, applicationId);
-        if (!stateDir.exists())
-            stateDir.mkdir();
-
-        return stateDir;
-    }
-
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -192,8 +176,7 @@ public class StreamThread extends Thread {
         // standby ktables
         this.standbyRecords = new HashMap<>();
 
-        // read in task specific config values
-        this.stateDir = makeStateDir(this.applicationId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -204,6 +187,7 @@ public class StreamThread extends Thread {
 
         this.sensors = new StreamsMetricsImpl(metrics);
 
+
         this.running = new AtomicBoolean(true);
     }
 
@@ -474,44 +458,7 @@ public class StreamThread extends Thread {
         long now = time.milliseconds();
 
         if (now > lastClean + cleanTimeMs) {
-            File[] stateDirs = stateDir.listFiles();
-            if (stateDirs != null) {
-                for (File dir : stateDirs) {
-                    try {
-                        String dirName = dir.getName();
-                        TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1));
-
-                        // try to acquire the exclusive lock on the state directory
-                        if (dir.exists()) {
-                            FileLock directoryLock = null;
-                            try {
-                                directoryLock = ProcessorStateManager.lockStateDirectory(dir);
-                                if (directoryLock != null) {
-                                    log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs);
-                                    Utils.delete(dir);
-                                }
-                            } catch (FileNotFoundException e) {
-                                // the state directory may be deleted by another thread
-                            } catch (IOException e) {
-                                log.error("Failed to lock the state directory due to an unexpected exception", e);
-                            } finally {
-                                if (directoryLock != null) {
-                                    try {
-                                        directoryLock.release();
-                                        directoryLock.channel().close();
-                                    } catch (IOException e) {
-                                        log.error("Failed to release the state directory lock");
-                                    }
-                                }
-                            }
-                        }
-                    } catch (TaskIdFormatException e) {
-                        // there may be some unknown files that sits in the same directory,
-                        // we should ignore these files instead trying to delete them as well
-                    }
-                }
-            }
-
+            stateDirectory.cleanRemovedTasks();
             lastClean = now;
         }
     }
@@ -534,7 +481,7 @@ public class StreamThread extends Thread {
 
         HashSet<TaskId> tasks = new HashSet<>();
 
-        File[] stateDirs = stateDir.listFiles();
+        File[] stateDirs = stateDirectory.listTaskDirectories();
         if (stateDirs != null) {
             for (File dir : stateDirs) {
                 try {
@@ -558,7 +505,7 @@ public class StreamThread extends Thread {
 
         ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
-        return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+        return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors, stateDirectory);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -629,7 +576,7 @@ public class StreamThread extends Thread {
         ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors);
+            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 890af0f..fbc92d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -29,14 +29,16 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.channels.FileLock;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,13 +47,15 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
 public class ProcessorStateManagerTest {
 
+    private File baseDir;
+    private StateDirectory stateDirectory;
+
     public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
@@ -190,316 +194,270 @@ public class ProcessorStateManagerTest {
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
 
-    @Test
-    public void testLockStateDirectory() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        try {
-            FileLock lock;
-
-            // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
-
-            try {
-                // this should not get the lock
-                lock = ProcessorStateManager.lockStateDirectory(baseDir);
-                assertNull(lock);
-            } finally {
-                // by closing the state manager, release the lock
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
+    @Before
+    public void setup() {
+        baseDir = TestUtils.tempDirectory();
+        stateDirectory = new StateDirectory(applicationId, baseDir.getPath());
+    }
 
-            // now, this should get the lock
-            lock = ProcessorStateManager.lockStateDirectory(baseDir);
-            try {
-                assertNotNull(lock);
-            } finally {
-                if (lock != null) {
-                    lock.release();
-                    lock.channel().close();
-                }
-            }
-        } finally {
-            Utils.delete(baseDir);
-        }
+    @After
+    public void cleanup() {
+        Utils.delete(baseDir);
     }
 
     @Test(expected = StreamsException.class)
     public void testNoTopic() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        try {
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
-            try {
-                stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory);
+        try {
+            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
         } finally {
-            Utils.delete(baseDir);
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
     }
 
     @Test
     public void testRegisterPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        try {
-            long lastCheckpointedOffset = 10L;
-
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        final TaskId taskId = new TaskId(0, 2);
+        long lastCheckpointedOffset = 10L;
 
-            restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                    new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
-            ));
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
-            TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
+        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
 
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
+        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(persistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
+        ));
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
-            try {
-                restoreConsumer.reset();
+        TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-                ArrayList<Integer> expectedKeys = new ArrayList<>();
-                long offset;
-                for (int i = 1; i <= 3; i++) {
-                    offset = (long) i;
-                    int key = i * 10;
-                    expectedKeys.add(key);
-                    restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
-                    );
-                }
+        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
-                stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory);
+        try {
+            restoreConsumer.reset();
+
+            ArrayList<Integer> expectedKeys = new ArrayList<>();
+            long offset;
+            for (int i = 1; i <= 3; i++) {
+                offset = (long) i;
+                int key = i * 10;
+                expectedKeys.add(key);
+                restoreConsumer.bufferRecord(
+                        new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
+                );
+            }
 
-                assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition);
-                assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
-                assertFalse(restoreConsumer.seekToBeginingCalled);
-                assertTrue(restoreConsumer.seekToEndCalled);
-                assertEquals(expectedKeys, persistentStore.keys);
+            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
+            assertEquals(new TopicPartition(persistentStoreTopicName, 2), restoreConsumer.assignedPartition);
+            assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
+            assertFalse(restoreConsumer.seekToBeginingCalled);
+            assertTrue(restoreConsumer.seekToEndCalled);
+            assertEquals(expectedKeys, persistentStore.keys);
 
         } finally {
-            Utils.delete(baseDir);
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
+
+
     }
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        try {
-            long lastCheckpointedOffset = 10L;
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        long lastCheckpointedOffset = 10L;
 
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
+        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
 
-            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
-            ));
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset));
 
-            TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(nonPersistentStoreTopicName, 2, Node.noNode(), new Node[0], new Node[0])
+        ));
 
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
+        TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2);
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
-            try {
-                restoreConsumer.reset();
+        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
-                ArrayList<Integer> expectedKeys = new ArrayList<>();
-                long offset = -1L;
-                for (int i = 1; i <= 3; i++) {
-                    offset = (long) (i + 100);
-                    int key = i;
-                    expectedKeys.add(i);
-                    restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
-                    );
-                }
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory);
+        try {
+            restoreConsumer.reset();
+
+            ArrayList<Integer> expectedKeys = new ArrayList<>();
+            long offset = -1L;
+            for (int i = 1; i <= 3; i++) {
+                offset = (long) (i + 100);
+                int key = i;
+                expectedKeys.add(i);
+                restoreConsumer.bufferRecord(
+                        new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, 0L, 0, 0, key, 0)
+                );
+            }
 
-                stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
 
-                assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition);
-                assertEquals(0L, restoreConsumer.seekOffset);
-                assertTrue(restoreConsumer.seekToBeginingCalled);
-                assertTrue(restoreConsumer.seekToEndCalled);
-                assertEquals(expectedKeys, nonPersistentStore.keys);
+            assertEquals(new TopicPartition(nonPersistentStoreTopicName, 2), restoreConsumer.assignedPartition);
+            assertEquals(0L, restoreConsumer.seekOffset);
+            assertTrue(restoreConsumer.seekToBeginingCalled);
+            assertTrue(restoreConsumer.seekToEndCalled);
+            assertEquals(expectedKeys, nonPersistentStore.keys);
 
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
         } finally {
-            Utils.delete(baseDir);
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
+
     }
 
     @Test
     public void testChangeLogOffsets() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
+        final TaskId taskId = new TaskId(0, 0);
+        long lastCheckpointedOffset = 10L;
+        String storeName1 = "store1";
+        String storeName2 = "store2";
+        String storeName3 = "store3";
+
+        String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+        String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+        String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
+
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
+
+        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+
+        restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
+                new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
+                new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList(
+                new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
+        TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
+        TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(partition1, 13L);
+        endOffsets.put(partition2, 17L);
+        restoreConsumer.updateEndOffsets(endOffsets);
+
+        MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
+        MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
+        MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
+
+        // if there is an source partition, inherit the partition id
+        Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory); // standby
         try {
-            long lastCheckpointedOffset = 10L;
-            String storeName1 = "store1";
-            String storeName2 = "store2";
-            String storeName3 = "store3";
-
-            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
-            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
-            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
-
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-            restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList(
-                    new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updatePartitions(storeTopicName2, Utils.mkList(
-                    new PartitionInfo(storeTopicName2, 0, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updatePartitions(storeTopicName3, Utils.mkList(
-                    new PartitionInfo(storeTopicName3, 0, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo(storeTopicName3, 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-
-            TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
-            TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
-            TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
-
-            Map<TopicPartition, Long> endOffsets = new HashMap<>();
-            endOffsets.put(partition1, 13L);
-            endOffsets.put(partition2, 17L);
-            restoreConsumer.updateEndOffsets(endOffsets);
-
-            MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
-            MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
-            MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
-
-            // if there is an source partition, inherit the partition id
-            Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
-            try {
-                restoreConsumer.reset();
-
-                stateMgr.register(store1, true, store1.stateRestoreCallback);
-                stateMgr.register(store2, true, store2.stateRestoreCallback);
-                stateMgr.register(store3, true, store3.stateRestoreCallback);
-
-                Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
-
-                assertEquals(3, changeLogOffsets.size());
-                assertTrue(changeLogOffsets.containsKey(partition1));
-                assertTrue(changeLogOffsets.containsKey(partition2));
-                assertTrue(changeLogOffsets.containsKey(partition3));
-                assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
-                assertEquals(-1L, (long) changeLogOffsets.get(partition2));
-                assertEquals(-1L, (long) changeLogOffsets.get(partition3));
-
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
+            restoreConsumer.reset();
+
+            stateMgr.register(store1, true, store1.stateRestoreCallback);
+            stateMgr.register(store2, true, store2.stateRestoreCallback);
+            stateMgr.register(store3, true, store3.stateRestoreCallback);
+
+            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
+
+            assertEquals(3, changeLogOffsets.size());
+            assertTrue(changeLogOffsets.containsKey(partition1));
+            assertTrue(changeLogOffsets.containsKey(partition2));
+            assertTrue(changeLogOffsets.containsKey(partition3));
+            assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
+            assertEquals(-1L, (long) changeLogOffsets.get(partition2));
+            assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
         } finally {
-            Utils.delete(baseDir);
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
+
     }
 
     @Test
     public void testGetStore() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        try {
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
 
-            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-            ));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
 
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
-            try {
-                stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory);
+        try {
+            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
-                assertNull(stateMgr.getStore("noSuchStore"));
-                assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
+            assertNull(stateMgr.getStore("noSuchStore"));
+            assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
 
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
         } finally {
-            Utils.delete(baseDir);
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
     }
 
     @Test
     public void testClose() throws IOException {
-        File baseDir = Files.createTempDirectory(stateDir).toFile();
-        File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        final TaskId taskId = new TaskId(0, 1);
+        File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        // write an empty checkpoint file
+        OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
+        oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+
+        restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+        restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
+                new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        // set up ack'ed offsets
+        HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+        ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
+        ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
+        ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
+
+        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory);
         try {
-            // write an empty checkpoint file
-            OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-            oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
-            restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
-                    new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList(
-                    new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-
-            // set up ack'ed offsets
-            HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
-            ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
-            ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
-            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
-
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
-            try {
-                // make sure the checkpoint file is deleted
-                assertFalse(checkpointFile.exists());
-
-                restoreConsumer.reset();
-                stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
-
-                restoreConsumer.reset();
-                stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
-            } finally {
-                // close the state manager with the ack'ed offsets
-                stateMgr.close(ackedOffsets);
-            }
+            // make sure the checkpoint file is deleted
+            assertFalse(checkpointFile.exists());
 
-            // make sure all stores are closed, and the checkpoint file is written.
-            assertTrue(persistentStore.flushed);
-            assertTrue(persistentStore.closed);
-            assertTrue(nonPersistentStore.flushed);
-            assertTrue(nonPersistentStore.closed);
-            assertTrue(checkpointFile.exists());
-
-            // the checkpoint file should contain an offset from the persistent store only.
-            OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
-            Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
-            assertEquals(1, checkpointedOffsets.size());
-            assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
+            restoreConsumer.reset();
+            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+
+            restoreConsumer.reset();
+            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
         } finally {
-            Utils.delete(baseDir);
+            // close the state manager with the ack'ed offsets
+            stateMgr.close(ackedOffsets);
         }
+
+        // make sure all stores are closed, and the checkpoint file is written.
+        assertTrue(persistentStore.flushed);
+        assertTrue(persistentStore.closed);
+        assertTrue(nonPersistentStore.flushed);
+        assertTrue(nonPersistentStore.closed);
+        assertTrue(checkpointFile.exists());
+
+        // the checkpoint file should contain an offset from the persistent store only.
+        OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
+        Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
+        assertEquals(1, checkpointedOffsets.size());
+        assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1)));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9e15e1c..39cb0a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -34,11 +34,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -87,6 +88,8 @@ public class StandbyTaskTest {
                     new MockStateStoreSupplier(ktable.topic(), true, false)
             )
     );
+    private File baseDir;
+    private StateDirectory stateDirectory;
 
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
@@ -120,189 +123,176 @@ public class StandbyTaskTest {
                 new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
                 new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
+        baseDir = TestUtils.tempDirectory();
+        stateDirectory = new StateDirectory(applicationId, baseDir.getPath());
+    }
+
+    @After
+    public void cleanup() {
+        Utils.delete(baseDir);
     }
 
     @Test
     public void testStorePartitions() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+        StreamsConfig config = createConfig(baseDir);
+        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory);
 
-            assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
+        assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = Exception.class)
     public void testUpdateNonPersistentStore() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+        StreamsConfig config = createConfig(baseDir);
+        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory);
 
-            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+        restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
-            task.update(partition1,
-                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
-            );
+        task.update(partition1,
+                records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))
+        );
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testUpdate() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
-
-            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
-
-            for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
-                restoreStateConsumer.bufferRecord(record);
-            }
+        StreamsConfig config = createConfig(baseDir);
+        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory);
 
-            for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
-                TopicPartition partition = entry.getKey();
-                long offset = entry.getValue();
-                if (offset >= 0) {
-                    restoreStateConsumer.seek(partition, offset);
-                } else {
-                    restoreStateConsumer.seekToBeginning(singleton(partition));
-                }
+        restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+        for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) {
+            restoreStateConsumer.bufferRecord(record);
+        }
+
+        for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+            TopicPartition partition = entry.getKey();
+            long offset = entry.getValue();
+            if (offset >= 0) {
+                restoreStateConsumer.seek(partition, offset);
+            } else {
+                restoreStateConsumer.seekToBeginning(singleton(partition));
             }
+        }
 
-            task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+        task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
 
-            StandbyContextImpl context = (StandbyContextImpl) task.context();
-            MockStateStoreSupplier.MockStateStore store1 =
-                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1);
-            MockStateStoreSupplier.MockStateStore store2 =
-                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2);
+        StandbyContextImpl context = (StandbyContextImpl) task.context();
+        MockStateStoreSupplier.MockStateStore store1 =
+                (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1);
+        MockStateStoreSupplier.MockStateStore store2 =
+                (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2);
 
-            assertEquals(Collections.emptyList(), store1.keys);
-            assertEquals(Utils.mkList(1, 2, 3), store2.keys);
+        assertEquals(Collections.emptyList(), store1.keys);
+        assertEquals(Utils.mkList(1, 2, 3), store2.keys);
 
-            task.close();
+        task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            Map<TopicPartition, Long> offsets = checkpoint.read();
+        File taskDir = stateDirectory.directoryForTask(taskId);
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        Map<TopicPartition, Long> offsets = checkpoint.read();
 
-            assertEquals(1, offsets.size());
-            assertEquals(new Long(30L + 1L), offsets.get(partition2));
+        assertEquals(1, offsets.size());
+        assertEquals(new Long(30L + 1L), offsets.get(partition2));
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testUpdateKTable() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            consumer.assign(Utils.mkList(ktable));
-            Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
-            consumer.commitSync(committedOffsets);
-
-            restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
-                    new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
-            ));
-
-            StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
-
-            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
-
-            for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
-                restoreStateConsumer.bufferRecord(record);
-            }
+        consumer.assign(Utils.mkList(ktable));
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
+                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        StreamsConfig config = createConfig(baseDir);
+        StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null, stateDirectory);
+
+        restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
-            for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
-                TopicPartition partition = entry.getKey();
-                long offset = entry.getValue();
-                if (offset >= 0) {
-                    restoreStateConsumer.seek(partition, offset);
-                } else {
-                    restoreStateConsumer.seekToBeginning(singleton(partition));
-                }
+        for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
+                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
+                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
+            restoreStateConsumer.bufferRecord(record);
+        }
+
+        for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+            TopicPartition partition = entry.getKey();
+            long offset = entry.getValue();
+            if (offset >= 0) {
+                restoreStateConsumer.seek(partition, offset);
+            } else {
+                restoreStateConsumer.seekToBeginning(singleton(partition));
             }
+        }
 
-            // The commit offset is at 0L. Records should not be processed
-            List<ConsumerRecord<byte[], byte[]>> remaining = task.update(ktable, restoreStateConsumer.poll(100).records(ktable));
-            assertEquals(5, remaining.size());
+        // The commit offset is at 0L. Records should not be processed
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(ktable, restoreStateConsumer.poll(100).records(ktable));
+        assertEquals(5, remaining.size());
 
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(10L));
-            consumer.commitSync(committedOffsets);
-            task.commit(); // update offset limits
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(10L));
+        consumer.commitSync(committedOffsets);
+        task.commit(); // update offset limits
 
-            // The commit offset has not reached, yet.
-            remaining = task.update(ktable, remaining);
-            assertEquals(5, remaining.size());
+        // The commit offset has not reached, yet.
+        remaining = task.update(ktable, remaining);
+        assertEquals(5, remaining.size());
 
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(11L));
-            consumer.commitSync(committedOffsets);
-            task.commit(); // update offset limits
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(11L));
+        consumer.commitSync(committedOffsets);
+        task.commit(); // update offset limits
 
-            // one record should be processed.
-            remaining = task.update(ktable, remaining);
-            assertEquals(4, remaining.size());
+        // one record should be processed.
+        remaining = task.update(ktable, remaining);
+        assertEquals(4, remaining.size());
 
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(45L));
-            consumer.commitSync(committedOffsets);
-            task.commit(); // update offset limits
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(45L));
+        consumer.commitSync(committedOffsets);
+        task.commit(); // update offset limits
 
-            // The commit offset is now 45. All record except for the last one should be processed.
-            remaining = task.update(ktable, remaining);
-            assertEquals(1, remaining.size());
+        // The commit offset is now 45. All record except for the last one should be processed.
+        remaining = task.update(ktable, remaining);
+        assertEquals(1, remaining.size());
 
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(50L));
-            consumer.commitSync(committedOffsets);
-            task.commit(); // update offset limits
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(50L));
+        consumer.commitSync(committedOffsets);
+        task.commit(); // update offset limits
 
-            // The commit offset is now 50. Still the last record remains.
-            remaining = task.update(ktable, remaining);
-            assertEquals(1, remaining.size());
+        // The commit offset is now 50. Still the last record remains.
+        remaining = task.update(ktable, remaining);
+        assertEquals(1, remaining.size());
 
-            committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(60L));
-            consumer.commitSync(committedOffsets);
-            task.commit(); // update offset limits
+        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(60L));
+        consumer.commitSync(committedOffsets);
+        task.commit(); // update offset limits
 
-            // The commit offset is now 60. No record should be left.
-            remaining = task.update(ktable, remaining);
-            assertNull(remaining);
+        // The commit offset is now 60. No record should be left.
+        remaining = task.update(ktable, remaining);
+        assertNull(remaining);
 
-            task.close();
+        task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            Map<TopicPartition, Long> offsets = checkpoint.read();
+        File taskDir = stateDirectory.directoryForTask(taskId);
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        Map<TopicPartition, Long> offsets = checkpoint.read();
 
-            assertEquals(1, offsets.size());
-            assertEquals(new Long(51L), offsets.get(ktable));
+        assertEquals(1, offsets.size());
+        assertEquals(new Long(51L), offsets.get(ktable));
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
new file mode 100644
index 0000000..c17e7bc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StateDirectoryTest {
+
+    private File stateDir;
+    private String applicationId = "applicationId";
+    private StateDirectory directory;
+    private File appDir;
+
+    @Before
+    public void before() {
+        stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
+        directory = new StateDirectory(applicationId, stateDir.getPath());
+        appDir = new File(stateDir, applicationId);
+    }
+
+    @After
+    public void cleanup() {
+        if (stateDir.exists()) {
+            Utils.delete(stateDir);
+        }
+    }
+
+    @Test
+    public void shouldCreateBaseDirectory() throws Exception {
+        assertTrue(stateDir.exists());
+        assertTrue(stateDir.isDirectory());
+        assertTrue(appDir.exists());
+        assertTrue(appDir.isDirectory());
+    }
+
+    @Test
+    public void shouldCreateTaskStateDirectory() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        assertTrue(taskDirectory.exists());
+        assertTrue(taskDirectory.isDirectory());
+    }
+
+    @Test
+    public void shouldLockTaskStateDirectory() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+
+        directory.lock(taskId, 0);
+
+        final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        try {
+            channel.tryLock();
+            fail("shouldn't be able to lock already locked directory");
+        } catch (OverlappingFileLockException e) {
+            // pass
+        }
+
+    }
+
+    @Test
+    public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        directory.directoryForTask(taskId);
+        directory.lock(taskId, 0);
+        assertTrue(directory.lock(taskId, 0));
+    }
+
+
+    @Test
+    public void shouldLockMulitpleTaskDirectories() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        final File task1Dir = directory.directoryForTask(taskId);
+        final TaskId taskId2 = new TaskId(1, 0);
+        final File task2Dir = directory.directoryForTask(taskId2);
+
+        directory.lock(taskId, 0);
+        directory.lock(taskId2, 0);
+
+        final FileChannel channel1 = FileChannel.open(new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        try {
+            channel1.tryLock();
+            channel2.tryLock();
+            fail("shouldn't be able to lock already locked directory");
+        } catch (OverlappingFileLockException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldReleaseTaskStateDirectoryLock() throws Exception {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+
+        directory.lock(taskId, 1);
+        directory.unlock(taskId);
+
+        final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        channel.tryLock();
+    }
+
+    @Test
+    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
+        final TaskId task0 = new TaskId(0, 0);
+        final TaskId task1 = new TaskId(1, 0);
+        directory.lock(task0, 0);
+        directory.lock(task1, 0);
+        directory.directoryForTask(new TaskId(2, 0));
+
+        directory.cleanRemovedTasks();
+        final List<File> files = Arrays.asList(appDir.listFiles());
+        assertEquals(2, files.size());
+        assertTrue(files.contains(new File(appDir, task0.toString())));
+        assertTrue(files.contains(new File(appDir, task1.toString())));
+
+    }
+
+    @Test
+    public void shouldNotRemoveNonTaskDirectoriesAndFiles() throws Exception {
+        final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
+        directory.cleanRemovedTasks();
+        assertTrue(otherDir.exists());
+    }
+
+    @Test
+    public void shouldListAllTaskDirectories() throws Exception {
+        TestUtils.tempDirectory(stateDir.toPath(), "foo");
+        final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
+        final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
+
+        final List<File> dirs = Arrays.asList(directory.listTaskDirectories());
+        assertEquals(2, dirs.size());
+        assertTrue(dirs.contains(taskDir1));
+        assertTrue(dirs.contains(taskDir2));
+    }
+
+}
\ No newline at end of file


Mime
View raw message