kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5562; execute state dir cleanup on single thread
Date Wed, 26 Jul 2017 09:11:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 89faed8d3 -> 9f3f8b4de


KAFKA-5562; execute state dir cleanup on single thread

Use a single `StateDirectory` per streams instance.
Use threadId to determine which thread owns the lock.
Only allow the owning thread to unlock.
Execute cleanup on a scheduled thread in `KafkaStreams`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3516 from dguy/kafka-5562


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f3f8b4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f3f8b4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f3f8b4d

Branch: refs/heads/trunk
Commit: 9f3f8b4de64501902a4843064b1cf1feb21c690a
Parents: 89faed8
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Jul 26 10:10:50 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Jul 26 10:10:50 2017 +0100

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../org/apache/kafka/streams/KafkaStreams.java  |  45 ++-
 .../processor/internals/StateDirectory.java     |  66 ++--
 .../processor/internals/StreamThread.java       |  16 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  53 +++
 .../integration/RegexSourceIntegrationTest.java |   3 +-
 .../processor/internals/StateDirectoryTest.java |  57 ++++
 .../internals/StreamPartitionAssignorTest.java  |  53 +--
 .../processor/internals/StreamThreadTest.java   | 331 ++++++-------------
 .../StreamThreadStateStoreProviderTest.java     |  20 +-
 10 files changed, 331 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2921372..0c0b382 100644
--- a/build.gradle
+++ b/build.gradle
@@ -857,6 +857,7 @@ project(':streams') {
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
     testCompile libs.junit
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f06a7e9..ae5301a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -57,7 +57,6 @@ import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -70,6 +69,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
@@ -128,6 +130,7 @@ public class KafkaStreams {
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
     private GlobalStreamThread globalStreamThread;
 
+    private final ScheduledExecutorService stateDirCleaner;
     private final StreamThread[] threads;
     private final Metrics metrics;
     private final QueryableStoreProvider queryableStoreProvider;
@@ -140,6 +143,7 @@ public class KafkaStreams {
     private final String logPrefix;
     private final StreamsMetadataState streamsMetadataState;
     private final StreamsConfig config;
+    private final StateDirectory stateDirectory;
 
     // container states
     /**
@@ -471,12 +475,13 @@ public class KafkaStreams {
         final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
                 (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
 
+        stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
         if (globalTaskTopology != null) {
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
                                                         clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
-                                                        new StateDirectory(applicationId, globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
+                                                        stateDirectory,
                                                         metrics,
                                                         time,
                                                         globalThreadId);
@@ -493,7 +498,8 @@ public class KafkaStreams {
                                           metrics,
                                           time,
                                           streamsMetadataState,
-                                          cacheSizeBytes);
+                                          cacheSizeBytes,
+                                          stateDirectory);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
@@ -507,6 +513,15 @@ public class KafkaStreams {
 
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
+        final String cleanupThreadName = clientId + "-CleanupThread";
+        stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = new Thread(r, cleanupThreadName);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
     }
 
     private static HostInfo parseHostInfo(final String endPoint) {
@@ -577,6 +592,18 @@ public class KafkaStreams {
             thread.start();
         }
 
+        final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        stateDirCleaner.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                synchronized (stateLock) {
+                    if (state == State.RUNNING) {
+                        stateDirectory.cleanRemovedTasks(cleanupDelay);
+                    }
+                }
+            }
+        }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
+
         log.info("{} Started Kafka Stream process", logPrefix);
     }
 
@@ -631,6 +658,7 @@ public class KafkaStreams {
             return true;
         }
 
+        stateDirCleaner.shutdownNow();
         // save the current thread so that if it is a stream thread
         // we don't attempt to join it and cause a deadlock
         final Thread shutdown = new Thread(new Runnable() {
@@ -724,17 +752,6 @@ public class KafkaStreams {
         if (isRunning()) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
-
-        final String appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
-
-        final String localApplicationDir = stateDir + File.separator + appId;
-        log.debug("{} Removing local Kafka Streams application data in {} for application {}.",
-            logPrefix,
-            localApplicationDir,
-            appId);
-
-        final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir, Time.SYSTEM);
         stateDirectory.cleanRemovedTasks(0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 3e547eb..c4262bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -45,21 +45,25 @@ public class StateDirectory {
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
 
     private final File stateDir;
-    private final String logPrefix;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
-    private final HashMap<TaskId, FileLock> locks = new HashMap<>();
+    private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
     private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
 
-    public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
-        this(applicationId, "", stateDirConfig, time);
+    private static class LockAndOwner {
+        final FileLock lock;
+        final String owningThread;
+
+        LockAndOwner(final String owningThread, final FileLock lock) {
+            this.owningThread = owningThread;
+            this.lock = lock;
+        }
     }
 
-    public StateDirectory(final String applicationId, final String threadId, final String stateDirConfig, final Time time) {
+    public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
         this.time = time;
-        this.logPrefix = String.format("stream-thread [%s]", threadId);
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
@@ -95,6 +99,9 @@ public class StateDirectory {
         return dir;
     }
 
+    private String logPrefix() {
+        return String.format("stream-thread [%s]", Thread.currentThread().getName());
+    }
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId
@@ -102,13 +109,19 @@ public class StateDirectory {
      * @return true if successful
      * @throws IOException
      */
-    boolean lock(final TaskId taskId, int retry) throws IOException {
+    synchronized boolean lock(final TaskId taskId, int retry) throws IOException {
+
         final File lockFile;
         // we already have the lock so bail out here
-        if (locks.containsKey(taskId)) {
-            log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
+        final LockAndOwner lockAndOwner = locks.get(taskId);
+        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+            log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
             return true;
+        } else if (lockAndOwner != null) {
+            // another thread owns the lock
+            return false;
         }
+
         try {
             lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
         } catch (ProcessorStateException e) {
@@ -130,16 +143,16 @@ public class StateDirectory {
 
         final FileLock lock = tryLock(retry, channel);
         if (lock != null) {
-            locks.put(taskId, lock);
+            locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
 
-            log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId);
+            log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId);
         }
         return lock != null;
     }
 
-    boolean lockGlobalState(final int retry) throws IOException {
+    synchronized boolean lockGlobalState(final int retry) throws IOException {
         if (globalStateLock != null) {
-            log.trace("{} Found cached state dir lock for the global task", logPrefix);
+            log.trace("{} Found cached state dir lock for the global task", logPrefix());
             return true;
         }
 
@@ -161,12 +174,12 @@ public class StateDirectory {
         globalStateChannel = channel;
         globalStateLock = fileLock;
 
-        log.debug("{} Acquired global state dir lock", logPrefix);
+        log.debug("{} Acquired global state dir lock", logPrefix());
 
         return true;
     }
 
-    void unlockGlobalState() throws IOException {
+    synchronized void unlockGlobalState() throws IOException {
         if (globalStateLock == null) {
             return;
         }
@@ -175,7 +188,7 @@ public class StateDirectory {
         globalStateLock = null;
         globalStateChannel = null;
 
-        log.debug("{} Released global state dir lock", logPrefix);
+        log.debug("{} Released global state dir lock", logPrefix());
     }
 
     /**
@@ -183,12 +196,12 @@ public class StateDirectory {
      * @param taskId
      * @throws IOException
      */
-    void unlock(final TaskId taskId) throws IOException {
-        final FileLock lock = locks.remove(taskId);
-        if (lock != null) {
-            lock.release();
-
-            log.debug("{} Released state dir lock for task {}", logPrefix, taskId);
+    synchronized void unlock(final TaskId taskId) throws IOException {
+        final LockAndOwner lockAndOwner = locks.get(taskId);
+        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+            locks.remove(taskId);
+            lockAndOwner.lock.release();
+            log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);
 
             final FileChannel fileChannel = channels.remove(taskId);
             if (fileChannel != null) {
@@ -204,12 +217,11 @@ public class StateDirectory {
      * @param cleanupDelayMs only remove directories if they haven't been modified for at least
      *                       this amount of time (milliseconds)
      */
-    public void cleanRemovedTasks(final long cleanupDelayMs) {
+    public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         final File[] taskDirs = listTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
         }
-
         for (File taskDir : taskDirs) {
             final String dirName = taskDir.getName();
             TaskId id = TaskId.parse(dirName);
@@ -219,19 +231,19 @@ public class StateDirectory {
                         long now = time.milliseconds();
                         long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix, dirName, id, now - lastModifiedMs, cleanupDelayMs);
+                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
                             Utils.delete(taskDir);
                         }
                     }
                 } catch (OverlappingFileLockException e) {
                     // locked by another thread
                 } catch (IOException e) {
-                    log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix, e);
+                    log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e);
                 } finally {
                     try {
                         unlock(id);
                     } catch (IOException e) {
-                        log.error("{} Failed to release the state directory lock", logPrefix);
+                        log.error("{} Failed to release the state directory lock", logPrefix());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f10bf41..3bafcd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -449,7 +449,8 @@ public class StreamThread extends Thread {
                         final Metrics metrics,
                         final Time time,
                         final StreamsMetadataState streamsMetadataState,
-                        final long cacheSizeBytes) {
+                        final long cacheSizeBytes,
+                        final StateDirectory stateDirectory) {
         super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
         this.applicationId = applicationId;
         this.config = config;
@@ -499,7 +500,7 @@ public class StreamThread extends Thread {
         // standby KTables
         standbyRecords = new HashMap<>();
 
-        stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+        this.stateDirectory = stateDirectory;
         final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
         rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
         pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
@@ -568,7 +569,6 @@ public class StreamThread extends Thread {
             maybePunctuateSystemTime();
             maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks(timerStartedMs);
-            maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
     }
@@ -912,16 +912,6 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Cleanup any states of the tasks that have been removed from this thread
-     */
-    protected void maybeClean(final long now) {
-        if (now > lastCleanMs + cleanTimeMs) {
-            stateDirectory.cleanRemovedTasks(cleanTimeMs);
-            lastCleanMs = now;
-        }
-    }
-
-    /**
      * Compute the latency based on the current marked timestamp, and update the marked timestamp
      * with the current system timestamp.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index e3443fb..3f60d69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -40,6 +40,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -449,6 +450,58 @@ public class KafkaStreamsTest {
         Assert.assertNotNull("streamString contains non-null appId", appId);
     }
 
+    @Test
+    public void shouldCleanupOldStateDirs() throws InterruptedException {
+        final Properties props = new Properties();
+        final String appId = "cleanupOldStateDirs";
+        final String stateDir = TestUtils.tempDirectory().getPath();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+
+        final String topic = "topic";
+        CLUSTER.createTopic(topic);
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream(Serdes.String(), Serdes.String(), topic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+        streams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                    latch.countDown();
+                }
+            }
+        });
+        final String appDir = stateDir + File.separator + appId;
+        final File oldTaskDir = new File(appDir, "10_1");
+        assertTrue(oldTaskDir.mkdirs());
+        try {
+            streams.start();
+            latch.await(30, TimeUnit.SECONDS);
+            verifyCleanupStateDir(appDir, oldTaskDir);
+            assertTrue(oldTaskDir.mkdirs());
+            verifyCleanupStateDir(appDir, oldTaskDir);
+        } finally {
+            streams.close();
+        }
+    }
+
+    private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
+        final File taskDir = new File(appDir, "0_0");
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return !oldTaskDir.exists() && taskDir.exists();
+            }
+        }, 30000, "cleanup has not successfully run");
+        assertTrue(taskDir.exists());
+    }
+
     public static class StateListenerStub implements KafkaStreams.StateListener {
         int numChanges = 0;
         KafkaStreams.State oldState;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index ad70112..522bd3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -465,7 +466,7 @@ public class RegexSourceIntegrationTest {
 
         public TestStreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                  0);
+                  0, new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index f1b5efe..2b28090 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -32,9 +32,13 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -266,4 +270,57 @@ public class StateDirectoryTest {
         }
     }
 
+    @Test
+    public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+        final TaskId taskId = new TaskId(0, 0);
+        final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    directory.lock(taskId, 1);
+                } catch (final IOException e) {
+                    exceptionOnThread.set(e);
+                }
+            }
+        });
+        thread.start();
+        thread.join(30000);
+        assertNull("should not have had an exception during locking on other thread", exceptionOnThread.get());
+        assertFalse(directory.lock(taskId, 1));
+    }
+
+    @Test
+    public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+        final TaskId taskId = new TaskId(0, 0);
+        final CountDownLatch lockLatch = new CountDownLatch(1);
+        final CountDownLatch unlockLatch = new CountDownLatch(1);
+        final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>();
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    directory.lock(taskId, 1);
+                    lockLatch.countDown();
+                    unlockLatch.await();
+                    directory.unlock(taskId);
+                } catch (final Exception e) {
+                    exceptionOnThread.set(e);
+                }
+            }
+        });
+        thread.start();
+        lockLatch.await(5, TimeUnit.SECONDS);
+
+        assertNull("should not have had an exception on other thread", exceptionOnThread.get());
+        directory.unlock(taskId);
+        assertFalse(directory.lock(taskId, 1));
+
+        unlockLatch.countDown();
+        thread.join(30000);
+
+        assertNull("should not have had an exception on other thread", exceptionOnThread.get());
+        assertTrue(directory.lock(taskId, 1));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 0a08b7c..f173a65 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -109,11 +110,12 @@ public class StreamPartitionAssignorTest {
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
+    private final StateDirectory stateDirectory = new StateDirectory("appId", TestUtils.tempDirectory().getPath(), new MockTime());
     private final StreamThread mockStreamThread = new StreamThread(builder, config,
                                                                    mockClientSupplier, "appID",
                                                                    "clientId", UUID.randomUUID(),
                                                                    new Metrics(), new MockTime(),
-                                                                   null, 1L);
+                                                                   null, 1L, stateDirectory);
     private final Map<String, Object> configurationMap = new HashMap<>();
 
     private Properties configProps() {
@@ -159,7 +161,8 @@ public class StreamPartitionAssignorTest {
             Time.SYSTEM,
             new StreamsMetadataState(builder,
                 StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             public Set<TaskId> prevActiveTasks() {
@@ -215,8 +218,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
-
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -290,7 +293,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -339,7 +343,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
@@ -407,7 +412,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -484,7 +490,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -577,7 +584,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -646,7 +654,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
 
@@ -682,7 +691,6 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-
         StreamThread thread10 = new StreamThread(
             builder,
             config,
@@ -693,7 +701,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
@@ -731,7 +740,7 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
         StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+                                                 0, stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
@@ -764,7 +773,7 @@ public class StreamPartitionAssignorTest {
         final String client1 = "client1";
 
         final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+                                                           0, stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
@@ -799,7 +808,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -840,7 +850,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
 
@@ -863,7 +874,6 @@ public class StreamPartitionAssignorTest {
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-
         final StreamThread streamThread = new StreamThread(
             builder,
             config,
@@ -874,7 +884,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         try {
             partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
@@ -996,7 +1007,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
@@ -1087,7 +1099,8 @@ public class StreamPartitionAssignorTest {
             new Metrics(),
             Time.SYSTEM,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9b9d6cd..e808eb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
@@ -46,6 +47,7 @@ import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -91,7 +93,8 @@ public class StreamThreadTest {
     private UUID processId = UUID.randomUUID();
     final KStreamBuilder builder = new KStreamBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps(false));
-
+    private final String stateDir = TestUtils.tempDirectory().getPath();
+    private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir, mockTime);
 
 
     @Before
@@ -450,8 +453,10 @@ public class StreamThreadTest {
             processId,
             metrics,
             Time.SYSTEM,
+
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
@@ -503,7 +508,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
         final StreamThread thread2 = new StreamThread(
             builder.internalTopologyBuilder,
             config,
@@ -514,7 +520,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
         final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
@@ -617,16 +624,16 @@ public class StreamThreadTest {
     @Test
     public void testMetrics() {
         final StreamThread thread = new StreamThread(
-            builder.internalTopologyBuilder,
-            config,
-            clientSupplier,
-            applicationId,
-            clientId,
-            processId,
-            metrics,
-            mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+                builder.internalTopologyBuilder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                mockTime,
+                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                0, stateDirectory);
         final String defaultGroupName = "stream-metrics";
         final String defaultPrefix = "thread." + thread.threadClientId();
         final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
@@ -658,150 +665,6 @@ public class StreamThreadTest {
 
 
     @Test
-    public void testMaybeClean() throws IOException, InterruptedException, java.nio.file.NoSuchFileException {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final long cleanupDelay = 1000L;
-            final Properties props = configProps(false);
-            props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
-            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-            final StreamsConfig config = new StreamsConfig(props);
-            final File applicationDir = new File(baseDir, applicationId);
-            applicationDir.mkdir();
-            final File stateDir1 = new File(applicationDir, task1.toString());
-            final File stateDir2 = new File(applicationDir, task2.toString());
-            final File stateDir3 = new File(applicationDir, task3.toString());
-            final File extraDir = new File(applicationDir, applicationId);
-            stateDir1.mkdir();
-            stateDir2.mkdir();
-            stateDir3.mkdir();
-            extraDir.mkdir();
-            builder.addSource("source1", "topic1");
-            final StreamThread thread = new StreamThread(
-                builder.internalTopologyBuilder,
-                config,
-                clientSupplier,
-                applicationId,
-                clientId,
-                processId,
-                metrics,
-                mockTime,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-                0) {
-                @Override
-                public void maybeClean(final long now) {
-                    super.maybeClean(now);
-                }
-                @Override
-                protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
-                    final ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(
-                        id,
-                        applicationId,
-                        partitionsForTask,
-                        topology,
-                        consumer,
-                        clientSupplier.getProducer(new HashMap<String, Object>()),
-                        restoreConsumer,
-                        config,
-                        new MockStreamsMetrics(new Metrics()),
-                        stateDirectory);
-                }
-            };
-
-            initPartitionGrouper(config, thread, clientSupplier);
-            assertTrue(thread.tasks().isEmpty());
-            mockTime.sleep(cleanupDelay);
-
-            // all directories exist since an assignment didn't happen
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            List<TopicPartition> revokedPartitions;
-            List<TopicPartition> assignedPartitions;
-            Map<TaskId, StreamTask> prevTasks;
-
-            // Assign t1p1 and t1p2. This should create task1 & task2
-            final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-            activeTasks.put(task1, Collections.singleton(t1p1));
-            activeTasks.put(task2, Collections.singleton(t1p2));
-            thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-
-            revokedPartitions = Collections.emptyList();
-            assignedPartitions = Arrays.asList(t1p1, t1p2);
-            prevTasks = new HashMap<>(thread.tasks());
-
-            final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-            thread.setState(StreamThread.State.RUNNING);
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // there shouldn't be any previous task
-            assertTrue(prevTasks.isEmpty());
-
-            // task 1 & 2 are created
-            assertEquals(2, thread.tasks().size());
-
-            // all directories should still exit before the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories except for task task2 & task3 will be removed. the extra directory should still exists
-            mockTime.sleep(11L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // Revoke t1p1 and t1p2. This should remove task1 & task2
-            activeTasks.clear();
-            revokedPartitions = assignedPartitions;
-            assignedPartitions = Collections.emptyList();
-            prevTasks = new HashMap<>(thread.tasks());
-
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // previous tasks should be committed
-            assertEquals(2, prevTasks.size());
-            for (final StreamTask task : prevTasks.values()) {
-                assertTrue(((TestStreamTask) task).committed);
-                ((TestStreamTask) task).committed = false;
-            }
-
-            // no task
-            assertTrue(thread.tasks().isEmpty());
-
-            // all state directories for task task1 & task2 still exist before the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories for task task1 & task2 are removed
-            mockTime.sleep(11L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertFalse(stateDir1.exists());
-            assertFalse(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-        } finally {
-            // note that this call can throw a java.nio.file.NoSuchFileException
-            // since some of the subdirs might be deleted already during cleanup
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
     public void testMaybeCommit() throws IOException, InterruptedException {
         final File baseDir = Files.createTempDirectory("test").toFile();
         try {
@@ -815,16 +678,16 @@ public class StreamThreadTest {
             builder.addSource("source1", "topic1");
 
             final StreamThread thread = new StreamThread(
-                builder.internalTopologyBuilder,
-                config,
-                clientSupplier,
-                applicationId,
-                clientId,
-                processId,
-                metrics,
-                mockTime,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                    builder.internalTopologyBuilder,
+                    config,
+                    clientSupplier,
+                    applicationId,
+                    clientId,
+                    processId,
+                    metrics,
+                    mockTime,
+                    new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                    0, stateDirectory) {
 
                 @Override
                 public void maybeCommit(final long now) {
@@ -916,7 +779,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
@@ -952,7 +816,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
@@ -991,7 +856,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
@@ -1024,7 +890,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
@@ -1055,7 +922,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         thread.setPartitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -1095,7 +963,8 @@ public class StreamThreadTest {
                 metrics,
                 mockTime,
                 new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                0,
+                stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
@@ -1146,7 +1015,8 @@ public class StreamThreadTest {
                 metrics,
                 mockTime,
                 new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-                0);
+                0,
+                stateDirectory);
 
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
@@ -1203,7 +1073,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                          Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
@@ -1275,7 +1146,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1355,7 +1227,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final MockConsumer consumer = clientSupplier.consumer;
         consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
@@ -1451,7 +1324,8 @@ public class StreamThreadTest {
             new Metrics(),
             new MockTime(),
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(task1, task0Assignment);
@@ -1505,7 +1379,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1559,7 +1434,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1625,7 +1501,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1683,7 +1560,7 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+            new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1727,7 +1604,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         final Map<String, Object> configurationMap = new HashMap<>();
@@ -1800,53 +1678,40 @@ public class StreamThreadTest {
     public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupTest(taskId);
-        final StateDirectory testStateDir = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupTest(taskId, stateDirMock);
 
-        assertFalse(testStateDir.lock(taskId, 0));
         try {
             thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("Should have thrown exception");
         } catch (final Exception e) {
-            assertTrue(testStateDir.lock(taskId, 0));
+            //
         } finally {
             thread.close();
-            testStateDir.unlock(taskId);
         }
+
+        EasyMock.verify(stateDirMock);
     }
 
     @Test
     public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupTest(taskId);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
 
-        final StateDirectory testStateDir = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
-
-        assertFalse(testStateDir.lock(taskId, 0));
-        try {
-            thread.close();
-            thread.join();
-            assertTrue(testStateDir.lock(taskId, 0));
-        } finally {
-            testStateDir.unlock(taskId);
-        }
+        final StreamThread thread = setupTest(taskId, stateDirMock);
+        thread.close();
+        thread.join();
+        EasyMock.verify(stateDirMock);
     }
 
-    private StreamThread setupTest(final TaskId taskId) throws InterruptedException {
+
+    private StreamThread setupTest(final TaskId taskId, final StateDirectory stateDirectory) throws InterruptedException {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
         builder.addSource("source", "topic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final StateDirectory stateDirectory = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
 
         final TestStreamTask testStreamTask = new TestStreamTask(taskId,
             applicationId,
@@ -1865,6 +1730,7 @@ public class StreamThreadTest {
             }
         };
 
+
         final StreamThread thread = new StreamThread(
             builder.internalTopologyBuilder,
             config,
@@ -1875,7 +1741,7 @@ public class StreamThreadTest {
             new Metrics(),
             new MockTime(),
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0, stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1904,22 +1770,20 @@ public class StreamThreadTest {
     public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupStandbyTest(taskId);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
+
         startThreadAndRebalance(thread);
-        final StateDirectory testStateDir = new StateDirectory(applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
 
-        assertFalse(testStateDir.lock(taskId, 0));
         try {
             thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("Should have thrown exception");
         } catch (final Exception e) {
-            assertTrue(testStateDir.lock(taskId, 0));
+            // ok
         } finally {
             thread.close();
-            testStateDir.unlock(taskId);
         }
+        EasyMock.verify(stateDirMock);
     }
 
     private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException {
@@ -1938,24 +1802,31 @@ public class StreamThreadTest {
     public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupStandbyTest(taskId);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
         startThreadAndRebalance(thread);
-        final StateDirectory testStateDir = new StateDirectory(applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
 
-        assertFalse(testStateDir.lock(taskId, 0));
+
         try {
             thread.close();
             thread.join();
-            assertTrue(testStateDir.lock(taskId, 0));
         } finally {
             thread.close();
-            testStateDir.unlock(taskId);
         }
+        EasyMock.verify(stateDirMock);
     }
 
-    private StreamThread setupStandbyTest(final TaskId taskId) {
+    private StateDirectory mockStateDirInteractions(final TaskId taskId) throws IOException {
+        final StateDirectory stateDirMock = EasyMock.createNiceMock(StateDirectory.class);
+        EasyMock.expect(stateDirMock.lock(EasyMock.eq(taskId), EasyMock.anyInt())).andReturn(true);
+        EasyMock.expect(stateDirMock.directoryForTask(taskId)).andReturn(new File(stateDir));
+        stateDirMock.unlock(taskId);
+        EasyMock.expectLastCall();
+        EasyMock.replay(stateDirMock);
+        return stateDirMock;
+    }
+
+    private StreamThread setupStandbyTest(final TaskId taskId, final StateDirectory stateDirectory) {
         final String storeName = "store";
         final String changelogTopic = applicationId + "-" + storeName + "-changelog";
 
@@ -1985,7 +1856,8 @@ public class StreamThreadTest {
             new Metrics(),
             new MockTime(),
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -2066,7 +1938,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f3f8b4d/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index d2f236f..50d1ebf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -113,16 +113,16 @@ public class StreamThreadStateStoreProviderTest {
         storesAvailable = true;
         provider = new StreamThreadStateStoreProvider(
             new StreamThread(
-                builder.internalTopologyBuilder,
-                streamsConfig,
-                clientSupplier,
-                applicationId,
-                "clientId",
-                UUID.randomUUID(),
-                new Metrics(),
-                Time.SYSTEM,
-                new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                    builder.internalTopologyBuilder,
+                    streamsConfig,
+                    clientSupplier,
+                    applicationId,
+                    "clientId",
+                    UUID.randomUUID(),
+                    new Metrics(),
+                    Time.SYSTEM,
+                    new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+                    0, stateDirectory) {
 
                 @Override
                 public Map<TaskId, StreamTask> tasks() {


Mime
View raw message