kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ableegold...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (#9247)
Date Tue, 02 Feb 2021 20:07:16 GMT
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new ba0f04b  KAFKA-10362: When resuming Streams active task with EOS, the checkpoint
file is deleted (#9247)
ba0f04b is described below

commit ba0f04b569cae036ba2c79c91a32af2bba5397e9
Author: Sharath Bhat <sharathbhat55@gmail.com>
AuthorDate: Wed Oct 7 23:50:06 2020 +0530

    KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted
(#9247)
    
    Deleted the checkpoint file before the transition from SUSPENDED state to RESTORING state
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../processor/internals/ProcessorStateManager.java |  6 +++++
 .../streams/processor/internals/StreamTask.java    |  9 +++++++
 .../internals/ProcessorStateManagerTest.java       | 30 ++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1de599d..deadc68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -598,4 +598,10 @@ public class ProcessorStateManager implements StateManager {
     private Long changelogOffsetFromCheckpointedOffset(final long offset) {
         return offset != OFFSET_UNKNOWN ? offset : null;
     }
+
+    public void deleteCheckPointFileIfEOSEnabled() throws IOException {
+        if (eosEnabled) {
+            checkpointFile.delete();
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5ed6e52..61a94c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -329,6 +329,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             case SUSPENDED:
                 // just transit the state without any logical changes: suspended and restoring
states
                 // are not actually any different for inner modules
+
+                // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
+                try {
+                    stateMgr.deleteCheckPointFileIfEOSEnabled();
+                    log.debug("Deleted check point file upon resuming with EOS enabled");
+                } catch (final IOException ioe) {
+                    log.error("Encountered error while deleting the checkpoint file due to
this exception", ioe);
+                }
+
                 transitionTo(State.RESTORING);
                 log.info("Resumed to restoring state");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8d4e35b..a3e5a75 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -891,6 +891,36 @@ public class ProcessorStateManagerTest {
         stateMgr.close();
     }
 
+    @Test
+    public void shouldDeleteCheckPointFileIfEosEnabled() throws IOException {
+        final long checkpointOffset = 10L;
+        final Map<TopicPartition, Long> offsets = mkMap(
+                mkEntry(persistentStorePartition, checkpointOffset),
+                mkEntry(nonPersistentStorePartition, checkpointOffset),
+                mkEntry(irrelevantPartition, 999L)
+        );
+        checkpoint.write(offsets);
+        final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, true);
+        stateMgr.deleteCheckPointFileIfEOSEnabled();
+        stateMgr.close();
+        assertFalse(checkpointFile.exists());
+    }
+
+    @Test
+    public void shouldNotDeleteCheckPointFileIfEosNotEnabled() throws IOException {
+        final long checkpointOffset = 10L;
+        final Map<TopicPartition, Long> offsets = mkMap(
+                mkEntry(persistentStorePartition, checkpointOffset),
+                mkEntry(nonPersistentStorePartition, checkpointOffset),
+                mkEntry(irrelevantPartition, 999L)
+        );
+        checkpoint.write(offsets);
+        final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE, false);
+        stateMgr.deleteCheckPointFileIfEOSEnabled();
+        stateMgr.close();
+        assertTrue(checkpointFile.exists());
+    }
+
     private ProcessorStateManager getStateManager(final Task.TaskType taskType, final boolean
eosEnabled) {
         return new ProcessorStateManager(
             taskId,


Mime
View raw message