kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] 01/02: MINOR: Fix log message when tasks directory is cleaned manually (#9262)
Date Mon, 05 Oct 2020 20:15:18 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7566deb19932509d5be82672e426dec6a75894c5
Author: Bruno Cadonna <bruno@confluent.io>
AuthorDate: Tue Sep 22 18:12:25 2020 +0200

    MINOR: Fix log message when tasks directory is cleaned manually (#9262)
    
    Currently when a task directory is cleaned manually the message
    for the state dir cleaner is logged instead of the message for
    the manual cleanup. This is because the code checks the elapsed
    time since the last update before it checks whether the cleanup
    is a manual call. This commit changes the order of the checks.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>,
Matthias J. Sax <mjsax@apache.org>, Walker Carlson <wcarlson@confluent.io>, John
Roesler <vvcephei@apache.org>
---
 .../processor/internals/StateDirectory.java        | 94 ++++++++++++++--------
 .../processor/internals/StateDirectoryTest.java    | 37 +++++++++
 2 files changed, 97 insertions(+), 34 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 30dd2ca..de9fbea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -280,9 +280,8 @@ public class StateDirectory {
     public synchronized void clean() {
         // remove task dirs
         try {
-            cleanRemovedTasks(0, true);
+            cleanRemovedTasksCalledByUser();
         } catch (final Exception e) {
-            // this is already logged within cleanRemovedTasks
             throw new StreamsException(e);
         }
         // remove global dir
@@ -290,10 +289,13 @@ public class StateDirectory {
             if (stateDir.exists()) {
                 Utils.delete(globalStateDir().getAbsoluteFile());
             }
-        } catch (final IOException e) {
-            log.error("{} Failed to delete global state directory of {} due to an unexpected
exception",
-                appId, logPrefix(), e);
-            throw new StreamsException(e);
+        } catch (final IOException exception) {
+            log.error(
+                String.format("%s Failed to delete global state directory of %s due to an
unexpected exception",
+                    logPrefix(), appId),
+                exception
+            );
+            throw new StreamsException(exception);
         }
     }
 
@@ -306,24 +308,17 @@ public class StateDirectory {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception
{
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
@@ -331,34 +326,65 @@ public class StateDirectory {
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {}
as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir,
LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user
calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir,
LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn(
+                        String.format("%s Swallowed the following exception during deletion
of obsolete state directory %s for task %s:",
+                            logPrefix(), dirName, id),
+                        exception
+                    );
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is
safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn(
+                            String.format("%s Swallowed the following exception during unlocking
after deletion of obsolete " +
+                                "state directory %s for task %s:", logPrefix(), dirName,
id),
+                            exception
+                        );
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling
cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir,
LOCK_FILE_NAME)));
+                    } else {
+                        log.warn("{} Could not get lock for state directory {} for task {}
as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error(
+                        String.format("%s Failed to delete state directory %s for task %s
with exception:",
+                            logPrefix(), dirName, id),
+                        exception
+                    );
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is
safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error(
+                            String.format("%s Failed to release lock on state directory %s
for task %s with exception:",
+                                logPrefix(), dirName, id),
+                            exception
+                        );
+                        throw exception;
+                    }
                 }
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1645ea8..5c109dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -49,6 +50,10 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -533,6 +538,38 @@ public class StateDirectoryTest {
         assertTrue(runner.taskDirectory.isDirectory());
     }
 
+    @Test
+    public void shouldLogManualUserCallMessage() {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        final File testFile = new File(taskDirectory, "testFile");
+        assertThat(testFile.mkdir(), is(true));
+        assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class))
{
+            directory.clean();
+            assertThat(
+                appender.getMessages(),
+                hasItem(endsWith("as user calling cleanup."))
+            );
+        }
+    }
+
+    @Test
+    public void shouldLogStateDirCleanerMessage() {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        final File testFile = new File(taskDirectory, "testFile");
+        assertThat(testFile.mkdir(), is(true));
+        assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class))
{
+            final long cleanupDelayMs = 0;
+            directory.cleanRemovedTasks(cleanupDelayMs);
+            assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup
delay is " +  cleanupDelayMs + "ms).")));
+        }
+    }
+
     private static class CreateTaskDirRunner implements Runnable {
         private final StateDirectory directory;
         private final TaskId taskId;


Mime
View raw message