kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
Date Sat, 14 Mar 2020 20:49:35 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 605d55d  KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
605d55d is described below

commit 605d55dc173156471fb05c40d715ab318ce74952
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sat Mar 14 13:49:08 2020 -0700

    KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
    
    1. Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file (and hence the
parent directory) until releasing the lock. And after the lock is released only go ahead and
delete the parent directory if manualUserCall == true. That is, this is triggered from KafkaStreams#cleanUp
and users are responsible to make sure that Streams instance is not started and hence there
are no other threads trying to grab that lock.
    
    2. As a result, during scheduled cleanup the corresponding task.dir would not be empty
but be left with only the lock file, so effectively we still achieve the goal of releasing
disk spaces. For callers of listTaskDirectories like KIP-441 (cc @ableegoldman to take a look)
I've introduced a new listNonEmptyTaskDirectories which excludes such dummy task.dirs with
only the lock file left.
    
    3. Also fixed KAFKA-8999 along the way to expose the exception while traversing the directory.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  41 ++++--
 .../org/apache/kafka/streams/KafkaStreams.java     |   4 +-
 .../processor/internals/StateDirectory.java        | 139 ++++++++++++---------
 .../streams/processor/internals/TaskManager.java   |  12 +-
 .../processor/internals/StateDirectoryTest.java    |  43 +++++--
 .../processor/internals/TaskManagerTest.java       |   6 +-
 6 files changed, 160 insertions(+), 85 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0d16c0a..e9d4cc4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -736,29 +736,56 @@ public final class Utils {
     /**
      * Recursively delete the given file/directory and any subfiles (if any exist)
      *
-     * @param file The root file at which to begin deleting
+     * @param rootFile The root file at which to begin deleting
      */
-    public static void delete(final File file) throws IOException {
-        if (file == null)
+    public static void delete(final File rootFile) throws IOException {
+        delete(rootFile, Collections.emptyList());
+    }
+
+    /**
+     * Recursively delete the subfiles (if any exist) of the passed in root file that are
not included
+     * in the list to keep
+     *
+     * @param rootFile The root file at which to begin deleting
+     * @param filesToKeep The subfiles to keep (note that if a subfile is to be kept, so
are all its parent
+     *                    files in its pat)h; if empty we would also delete the root file
+     */
+    public static void delete(final File rootFile, final List<File> filesToKeep) throws
IOException {
+        if (rootFile == null)
             return;
-        Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+        Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<Path>() {
             @Override
             public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException
{
                 // If the root path did not exist, ignore the error; otherwise throw it.
-                if (exc instanceof NoSuchFileException && path.toFile().equals(file))
+                if (exc instanceof NoSuchFileException && path.toFile().equals(rootFile))
                     return FileVisitResult.TERMINATE;
                 throw exc;
             }
 
             @Override
             public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws
IOException {
-                Files.delete(path);
+                if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
                 return FileVisitResult.CONTINUE;
             }
 
             @Override
             public FileVisitResult postVisitDirectory(Path path, IOException exc) throws
IOException {
-                Files.delete(path);
+                // KAFKA-8999: if there's an exception thrown previously already, we should
throw it
+                if (exc != null) {
+                    throw exc;
+                }
+
+                if (rootFile.toPath().equals(path)) {
+                    // only delete the parent directory if there's nothing to keep
+                    if (filesToKeep.isEmpty()) {
+                        Files.delete(path);
+                    }
+                } else if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
+
                 return FileVisitResult.CONTINUE;
             }
         });
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 45d6acb..2a901ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -716,11 +716,11 @@ public class KafkaStreams implements AutoCloseable {
         }
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
         final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology
== null ? 0 : 1));
-        final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
+        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
                 (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
 
         try {
-            stateDirectory = new StateDirectory(config, time, createStateDirectory);
+            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 71b8b95..41c7c4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -33,6 +33,7 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.regex.Pattern;
 
@@ -50,11 +51,12 @@ public class StateDirectory {
     static final String LOCK_FILE_NAME = ".lock";
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
 
+    private final Time time;
+    private final String appId;
     private final File stateDir;
-    private final boolean createStateDirectory;
+    private final boolean hasPersistentStores;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
-    private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
@@ -72,22 +74,27 @@ public class StateDirectory {
     /**
      * Ensures that the state base directory as well as the application's sub-directory are
created.
      *
+     * @param config              streams application configuration to read the root state
directory path
+     * @param time                system timer used to execute periodic cleanup procedure
+     * @param hasPersistentStores only when the application's topology does have stores persisted
on local file
+     *                            system, we would go ahead and auto-create the corresponding
application / task / store
+     *                            directories whenever necessary; otherwise no directories
would be created.
+     *
      * @throws ProcessorStateException if the base state directory or application state directory
does not exist
-     *                                 and could not be created when createStateDirectory
is enabled.
+     *                                 and could not be created when hasPersistentStores
is enabled.
      */
-    public StateDirectory(final StreamsConfig config,
-                          final Time time,
-                          final boolean createStateDirectory) {
+    public StateDirectory(final StreamsConfig config, final Time time, final boolean hasPersistentStores)
{
         this.time = time;
-        this.createStateDirectory = createStateDirectory;
+        this.hasPersistentStores = hasPersistentStores;
+        this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs())
{
+        if (this.hasPersistentStores && !baseDir.exists() && !baseDir.mkdirs())
{
             throw new ProcessorStateException(
                 String.format("base state directory [%s] doesn't exist and couldn't be created",
stateDirName));
         }
-        stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        if (this.createStateDirectory && !stateDir.exists() && !stateDir.mkdir())
{
+        stateDir = new File(baseDir, appId);
+        if (this.hasPersistentStores && !stateDir.exists() && !stateDir.mkdir())
{
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created",
stateDir.getPath()));
         }
@@ -100,7 +107,7 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir())
{
+        if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir())
{
             throw new ProcessorStateException(
                 String.format("task directory [%s] doesn't exist and couldn't be created",
taskDir.getPath()));
         }
@@ -120,9 +127,13 @@ public class StateDirectory {
     boolean directoryForTaskIsEmpty(final TaskId taskId) {
         final File taskDir = directoryForTask(taskId);
 
+        return taskDirEmpty(taskDir);
+    }
+
+    private boolean taskDirEmpty(final File taskDir) {
         final File[] storeDirs = taskDir.listFiles(pathname ->
             !pathname.getName().equals(LOCK_FILE_NAME) &&
-            !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+                !pathname.getName().equals(CHECKPOINT_FILE_NAME));
 
         // if the task is stateless, storeDirs would be null
         return storeDirs == null || storeDirs.length == 0;
@@ -135,7 +146,7 @@ public class StateDirectory {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
+        if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and couldn't be
created", dir.getPath()));
         }
@@ -148,12 +159,12 @@ public class StateDirectory {
 
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
-     * @param taskId
+     * @param taskId task id
      * @return true if successful
-     * @throws IOException
+     * @throws IOException if the file cannot be created or file handle cannot be grabbed,
should be considered as fatal
      */
     synchronized boolean lock(final TaskId taskId) throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -197,7 +208,7 @@ public class StateDirectory {
     }
 
     synchronized boolean lockGlobalState() throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -259,18 +270,21 @@ public class StateDirectory {
     }
 
     public synchronized void clean() {
+        // remove task dirs
         try {
             cleanRemovedTasks(0, true);
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks
             throw new StreamsException(e);
         }
+        // remove global dir
         try {
             if (stateDir.exists()) {
                 Utils.delete(globalStateDir().getAbsoluteFile());
             }
         } catch (final IOException e) {
-            log.error("{} Failed to delete global state directory due to an unexpected exception",
logPrefix(), e);
+            log.error("{} Failed to delete global state directory of {} due to an unexpected
exception",
+                appId, logPrefix(), e);
             throw new StreamsException(e);
         }
     }
@@ -292,71 +306,82 @@ public class StateDirectory {
 
     private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                                                 final boolean manualUserCall) throws Exception
{
-        for (final File taskDir : listTaskDirectories()) {
+        final File[] taskDirs = listAllTaskDirectories();
+        if (taskDirs == null || taskDirs.length == 0) {
+            return; // nothing to do
+        }
+
+        for (final File taskDir : taskDirs) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
+                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
-                            if (!manualUserCall) {
-                                log.info(
-                                    "{} Deleting obsolete state directory {} for task {}
as {}ms has elapsed (cleanup delay is {}ms).",
-                                    logPrefix(),
-                                    dirName,
-                                    id,
-                                    now - lastModifiedMs,
-                                    cleanupDelayMs);
-                            } else {
-                                log.info(
-                                        "{} Deleting state directory {} for task {} as user
calling cleanup.",
-                                        logPrefix(),
-                                        dirName,
-                                        id);
-                            }
-                            Utils.delete(taskDir);
+                        if (now > lastModifiedMs + cleanupDelayMs) {
+                            log.info("{} Deleting obsolete state directory {} for task {}
as {}ms has elapsed (cleanup delay is {}ms).",
+                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
+
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir,
LOCK_FILE_NAME)));
+                        } else if (manualUserCall) {
+                            log.info("{} Deleting state directory {} for task {} as user
calling cleanup.",
+                                logPrefix(), dirName, id);
+
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir,
LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException e) {
-                    // locked by another thread
-                    if (manualUserCall) {
-                        log.error("{} Failed to get the state directory lock.", logPrefix(),
e);
-                        throw e;
-                    }
-                } catch (final IOException e) {
-                    log.error("{} Failed to delete the state directory.", logPrefix(), e);
-                    if (manualUserCall) {
-                        throw e;
-                    }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    exception = e;
                 } finally {
                     try {
                         unlock(id);
-                    } catch (final IOException e) {
-                        log.error("{} Failed to release the state directory lock.", logPrefix());
+
+                        // for manual user call, stream threads are not running so it is
safe to delete
+                        // the whole directory
                         if (manualUserCall) {
-                            throw e;
+                            Utils.delete(taskDir);
                         }
+                    } catch (final IOException e) {
+                        exception = e;
                     }
                 }
+
+                if (exception != null && manualUserCall) {
+                    log.error("{} Failed to release the state directory lock.", logPrefix());
+                    throw exception;
+                }
             }
         }
     }
 
     /**
+     * List all of the task directories that are non-empty
+     * @return The list of all the non-empty local directories for stream tasks
+     */
+    File[] listNonEmptyTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+            stateDir.listFiles(pathname -> {
+                if (!pathname.isDirectory() || !PATH_NAME.matcher(pathname.getName()).matches())
{
+                    return false;
+                } else {
+                    return !taskDirEmpty(pathname);
+                }
+            });
+
+        return taskDirectories == null ? new File[0] : taskDirectories;
+    }
+
+    /**
      * List all of the task directories
      * @return The list of all the existing local directories for stream tasks
      */
-    File[] listTaskDirectories() {
-        final File[] taskDirectories =
+    File[] listAllTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
             stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
 
-        if (!stateDir.exists() || taskDirectories == null) {
-            return new File[0];
-        } else {
-            return taskDirectories;
-        }
+        return taskDirectories == null ? new File[0] : taskDirectories;
     }
 
     private FileChannel getOrCreateFileChannel(final TaskId taskId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2c80c34..b804a4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -127,7 +127,7 @@ public class TaskManager {
     void handleRebalanceStart(final Set<String> subscribedTopics) {
         builder.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix);
 
-        tryToLockAllTaskDirectories();
+        tryToLockAllNonEmptyTaskDirectories();
 
         rebalanceInProgress = true;
     }
@@ -379,7 +379,7 @@ public class TaskManager {
 
     /**
      * Compute the offset total summed across all stores in a task. Includes offset sum for
any tasks we own the
-     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllTaskDirectories()}
+     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}
      *
      * @return Map from task id to its total offset summed across all state stores
      */
@@ -410,13 +410,13 @@ public class TaskManager {
     }
 
     /**
-     * Makes a weak attempt to lock all task directories in the state dir. We are responsible
for computing and
+     * Makes a weak attempt to lock all non-empty task directories in the state dir. We are
responsible for computing and
      * reporting the offset sum for any unassigned tasks we obtain the lock for in the upcoming
rebalance. Tasks
      * that we locked but didn't own will be released at the end of the rebalance (unless
of course we were
      * assigned the task as a result of the rebalance). This method should be idempotent.
      */
-    private void tryToLockAllTaskDirectories() {
-        for (final File dir : stateDirectory.listTaskDirectories()) {
+    private void tryToLockAllNonEmptyTaskDirectories() {
+        for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
             try {
                 final TaskId id = TaskId.parse(dir.getName());
                 try {
@@ -438,7 +438,7 @@ public class TaskManager {
 
     /**
      * We must release the lock for any unassigned tasks that we temporarily locked in preparation
for a
-     * rebalance in {@link #tryToLockAllTaskDirectories()}.
+     * rebalance in {@link #tryToLockAllNonEmptyTaskDirectories()}.
      */
     private void releaseLockedUnassignedTaskDirectories() {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e1ca9188..827557a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -249,18 +249,29 @@ public class StateDirectoryTest {
     public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception
{
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
+        final TaskId task2 = new TaskId(2, 0);
         try {
+            assertTrue(new File(directory.directoryForTask(task0), "store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task1), "store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task2), "store").mkdir());
+
             directory.lock(task0);
             directory.lock(task1);
-            directory.directoryForTask(new TaskId(2, 0));
 
-            List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            List<File> files = Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+
+            files = Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(3, files.size());
 
-            time.sleep(1000);
+            time.sleep(5000);
             directory.cleanRemovedTasks(0);
 
-            files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            files = Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+            files = Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(2, files.size());
             assertTrue(files.contains(new File(appDir, task0.toString())));
             assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -273,13 +284,19 @@ public class StateDirectoryTest {
     @Test
     public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay()
{
         final File dir = directory.directoryForTask(new TaskId(2, 0));
+        assertTrue(new File(dir, "store").mkdir());
+
         final int cleanupDelayMs = 60000;
         directory.cleanRemovedTasks(cleanupDelayMs);
         assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(1, directory.listNonEmptyTaskDirectories().length);
 
         time.sleep(cleanupDelayMs + 1000);
         directory.cleanRemovedTasks(cleanupDelayMs);
-        assertFalse(dir.exists());
+        assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(0, directory.listNonEmptyTaskDirectories().length);
     }
 
     @Test
@@ -290,15 +307,21 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldListAllTaskDirectories() {
+    public void shouldOnlyListNonEmptyTaskDirectories() {
         TestUtils.tempDirectory(stateDir.toPath(), "foo");
         final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
         final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
 
-        final List<File> dirs = Arrays.asList(directory.listTaskDirectories());
-        assertEquals(2, dirs.size());
-        assertTrue(dirs.contains(taskDir1));
-        assertTrue(dirs.contains(taskDir2));
+        final File storeDir = new File(taskDir1, "store");
+        assertTrue(storeDir.mkdir());
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.singletonList(taskDir1), Arrays.asList(directory.listNonEmptyTaskDirectories()));
+
+        directory.cleanRemovedTasks(0L);
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.emptyList(), Arrays.asList(directory.listNonEmptyTaskDirectories()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 62043e3..2f7681d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -168,7 +168,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldNotLockAnythingIfStateDirIsEmpty() {
-        expect(stateDirectory.listTaskDirectories()).andReturn(new File[0]).once();
+        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new File[0]).once();
 
         replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
@@ -999,7 +999,7 @@ public class TaskManagerTest {
         expect(consumer.assignment()).andReturn(assignment);
         consumer.pause(assignment);
         expectLastCall();
-        expect(stateDirectory.listTaskDirectories()).andReturn(new File[0]);
+        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new File[0]);
         replay(consumer, stateDirectory);
         assertThat(taskManager.isRebalanceInProgress(), is(false));
         taskManager.handleRebalanceStart(emptySet());
@@ -1665,7 +1665,7 @@ public class TaskManagerTest {
         for (int i = 0; i < names.length; ++i) {
             taskFolders[i] = testFolder.newFolder(names[i]);
         }
-        expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();
+        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
     }
 
     private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Long>
offsets) throws IOException {


Mime
View raw message