kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10166: always write checkpoint before closing an (initialized) task (#8926)
Date Fri, 26 Jun 2020 22:13:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new ef6a832  KAFKA-10166: always write checkpoint before closing an (initialized) task
(#8926)
ef6a832 is described below

commit ef6a832ff673e5e25a205a4915e271392719bcf0
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri Jun 26 15:11:56 2020 -0700

    KAFKA-10166: always write checkpoint before closing an (initialized) task (#8926)
    
    This should address at least some of the excessive TaskCorruptedExceptions we've been
seeing lately. Basically, at the moment we only commit tasks if commitNeeded is true -- this
seems obvious by definition. But the problem is we do some essential cleanup in postCommit
that should always be done before a task is closed:
    
    * clear the PartitionGroup
    * write the checkpoint
    
    The second is actually fine to skip when commitNeeded = false with ALOS, as we will have
already written a checkpoint during the last commit. But for EOS, we only write the checkpoint
before a close -- so even if there is no new pending data since the last commit, we have to
write the current offsets. If we don't, the task will be assumed dirty and we will run into
our friend the TaskCorruptedException during (re)initialization.
    
    To fix this, we should just always call prepareCommit and postCommit at the TaskManager
level. Within the task, it can decide whether or not to actually do something in those methods
based on commitNeeded.
    
    One subtle issue is that we still need to avoid checkpointing a task that was still in
CREATED, to avoid potentially overwriting an existing checkpoint with uninitialized empty
offsets. Unfortunately we always suspend a task before closing and committing, so we lose
the information about whether the task as in CREATED or RUNNING/RESTORING by the time we get
to the checkpoint. For this we introduce a special flag to keep track of whether a suspended
task should actually be checkpointed or not
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/processor/internals/StandbyTask.java   |  87 +++++++++++---
 .../streams/processor/internals/StreamTask.java    |  66 ++++++++---
 .../streams/processor/internals/TaskManager.java   | 105 ++++++++---------
 .../processor/internals/StandbyTaskTest.java       |  25 +++-
 .../processor/internals/StreamTaskTest.java        | 128 ++++++++++++++-------
 .../processor/internals/TaskManagerTest.java       |  23 ++--
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +
 7 files changed, 281 insertions(+), 155 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 1aa68bc..3826780 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -46,14 +46,15 @@ public class StandbyTask extends AbstractTask implements Task {
     private final InternalProcessorContext processorContext;
     private final StreamsMetricsImpl streamsMetrics;
 
-    private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
+    private boolean checkpointNeededForSuspended = false;
+    private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit = new HashMap<>();
 
     /**
      * @param id             the ID of this task
      * @param partitions     input topic partitions, used for thread metadata only
      * @param topology       the instance of {@link ProcessorTopology}
      * @param config         the {@link StreamsConfig} specified by the user
-     * @param streamsMetrics        the {@link StreamsMetrics} created by the thread
+     * @param streamsMetrics the {@link StreamsMetrics} created by the thread
      * @param stateMgr       the {@link ProcessorStateManager} for this task
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
@@ -93,6 +94,9 @@ public class StandbyTask extends AbstractTask implements Task {
         if (state() == State.CREATED) {
             StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory,
processorContext);
 
+            // initialize the snapshot with the current offsets as we don't need to commit
then until they change
+            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+
             // no topology needs initialized, we can transit to RUNNING
             // right after registered the stores
             transitionTo(State.RESTORING);
@@ -115,8 +119,15 @@ public class StandbyTask extends AbstractTask implements Task {
     public void suspend() {
         switch (state()) {
             case CREATED:
+                log.info("Suspended created");
+                checkpointNeededForSuspended = false;
+                transitionTo(State.SUSPENDED);
+
+                break;
+
             case RUNNING:
-                log.info("Suspended {}", state());
+                log.info("Suspended running");
+                checkpointNeededForSuspended = true;
                 transitionTo(State.SUSPENDED);
 
                 break;
@@ -144,18 +155,27 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     /**
-     * 1. flush store
-     * 2. write checkpoint file
+     * Flush stores before a commit
      *
      * @throws StreamsException fatal error, should close the thread
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            stateMgr.flush();
-            log.debug("Prepared task for committing");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while preparing
standby task " + id + " for committing ");
+        switch (state()) {
+            case CREATED:
+                log.debug("Skipped preparing created task for commit");
+
+                break;
+
+            case RUNNING:
+            case SUSPENDED:
+                stateMgr.flush();
+                log.debug("Prepared {} task for committing", state());
+
+                break;
+
+            default:
+                throw new IllegalStateException("Illegal state " + state() + " while preparing
standby task " + id + " for committing ");
         }
 
         return Collections.emptyMap();
@@ -163,14 +183,36 @@ public class StandbyTask extends AbstractTask implements Task {
 
     @Override
     public void postCommit() {
-        if (state() == State.RUNNING || state() == State.SUSPENDED) {
-            // since there's no written offsets we can checkpoint with empty map,
-            // and the state current offset would be used to checkpoint
-            stateMgr.checkpoint(Collections.emptyMap());
-            offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
-            log.debug("Finalized commit");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while post committing
standby task " + id);
+        switch (state()) {
+            case CREATED:
+                // We should never write a checkpoint for a CREATED task as we may overwrite
an existing checkpoint
+                // with empty uninitialized offsets
+                log.debug("Skipped writing checkpoint for created task");
+
+                break;
+
+            case RUNNING:
+                if (commitNeeded()) {
+                    writeCheckpoint();
+                }
+                log.debug("Finalized commit for running task");
+
+                break;
+
+            case SUSPENDED:
+                // don't overwrite the existing checkpoint file if we haven't actually initialized
the offsets yet
+                if (checkpointNeededForSuspended) {
+                    writeCheckpoint();
+                    log.debug("Finalized commit for suspended task");
+                    checkpointNeededForSuspended = false;
+                } else {
+                    log.debug("Skipped writing checkpoint for uninitialized suspended task");
+                }
+
+                break;
+
+            default:
+                throw new IllegalStateException("Illegal state " + state() + " while post
committing standby task " + id);
         }
     }
 
@@ -203,6 +245,13 @@ public class StandbyTask extends AbstractTask implements Task {
         log.info("Closed clean and recycled state");
     }
 
+    private void writeCheckpoint() {
+        // since there's no written offsets we can checkpoint with empty map,
+        // and the state's current offset would be used to checkpoint
+        stateMgr.checkpoint(Collections.emptyMap());
+        offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+    }
+
     private void close(final boolean clean) {
         switch (state()) {
             case SUSPENDED:
@@ -243,7 +292,7 @@ public class StandbyTask extends AbstractTask implements Task {
     @Override
     public boolean commitNeeded() {
         // we can commit if the store's offset has changed since last commit
-        return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
+        return !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4b27436..abdd567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
@@ -106,6 +107,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private long idleStartTimeMs;
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
+    private boolean checkpointNeededForSuspended = false;
 
     public StreamTask(final TaskId id,
                       final Set<TopicPartition> partitions,
@@ -249,8 +251,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     public void suspend() {
         switch (state()) {
             case CREATED:
+                log.info("Suspended created");
+                checkpointNeededForSuspended = false;
+                transitionTo(State.SUSPENDED);
+
+                break;
+
             case RESTORING:
-                log.info("Suspended {}", state());
+                log.info("Suspended restoring");
+                checkpointNeededForSuspended = true;
                 transitionTo(State.SUSPENDED);
 
                 break;
@@ -259,6 +268,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 try {
                     // use try-catch to ensure state transition to SUSPENDED even if user
code throws in `Processor#close()`
                     closeTopology();
+                    checkpointNeededForSuspended = true;
                 } finally {
                     transitionTo(State.SUSPENDED);
                     log.info("Suspended running");
@@ -338,18 +348,28 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      */
     @Override
     public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
         switch (state()) {
-            case RUNNING:
+            case CREATED:
             case RESTORING:
+            case RUNNING:
             case SUSPENDED:
                 stateMgr.flush();
-                recordCollector.flush();
-
-                log.debug("Prepared task for committing");
+                // the commitNeeded flag just indicates whether we have reached RUNNING and
processed any new data,
+                // so it only indicates whether the record collector should be flushed or
not, whereas the state
+                // manager should always be flushed; either there is newly restored data
or the flush will be a no-op
+                if (commitNeeded) {
+                    recordCollector.flush();
+
+                    log.debug("Prepared {} task for committing", state());
+                    offsetsToCommit = committableOffsetsAndMetadata();
+                } else {
+                    log.debug("Skipped preparing {} task for commit since there is nothing
to commit", state());
+                    offsetsToCommit = Collections.emptyMap();
+                }
 
                 break;
 
-            case CREATED:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while preparing
active task " + id + " for committing");
 
@@ -357,7 +377,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 throw new IllegalStateException("Unknown state " + state() + " while preparing
active task " + id + " for committing");
         }
 
-        return committableOffsetsAndMetadata();
+        return offsetsToCommit;
     }
 
     private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata()
{
@@ -399,7 +419,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 break;
 
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while getting
commitable offsets for active task " + id);
+                throw new IllegalStateException("Illegal state " + state() + " while getting
committable offsets for active task " + id);
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while post
committing active task " + id);
@@ -414,11 +434,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     @Override
     public void postCommit() {
         commitRequested = false;
-        commitNeeded = false;
 
         switch (state()) {
+            case CREATED:
+                // We should never write a checkpoint for a CREATED task as we may overwrite
an existing checkpoint
+                // with empty uninitialized offsets
+                log.debug("Skipped writing checkpoint for created task");
+
+                break;
+
             case RESTORING:
                 writeCheckpoint();
+                log.debug("Finalized commit for restoring task");
 
                 break;
 
@@ -426,6 +453,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 if (!eosEnabled) {
                     writeCheckpoint();
                 }
+                log.debug("Finalized commit for running task");
 
                 break;
 
@@ -438,19 +466,24 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                  */
                 partitionGroup.clear();
 
-                writeCheckpoint();
+                if (checkpointNeededForSuspended) {
+                    // Make sure we don't overwrite the checkpoint if the suspended task
was still in CREATED
+                    // and had not yet initialized offsets
+                    writeCheckpoint();
+                    log.debug("Finalized commit for suspended task");
+                    checkpointNeededForSuspended = false;
+                } else {
+                    log.debug("Skipped writing checkpoint for uninitialized suspended task");
+                }
 
                 break;
 
-            case CREATED:
             case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while post
committing active task " + id);
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while post
committing active task " + id);
         }
-
-        log.debug("Finalized commit");
     }
 
     private Map<TopicPartition, Long> extractPartitionTimes() {
@@ -511,10 +544,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     private void writeCheckpoint() {
         if (commitNeeded) {
-            log.error("Tried to write a checkpoint with pending uncommitted data, should
complete the commit first.");
-            throw new IllegalStateException("A checkpoint should only be written if no commit
is needed.");
+            stateMgr.checkpoint(checkpointableOffsets());
+        } else {
+            stateMgr.checkpoint(emptyMap());
         }
-        stateMgr.checkpoint(checkpointableOffsets());
+        commitNeeded = false;
     }
 
     private void validateClean() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 025e9f4..173efff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -242,15 +242,12 @@ public class TaskManager {
 
         for (final Task task : tasksToClose) {
             try {
-                task.suspend(); // Should be a no-op for active tasks since they're suspended
in handleRevocation
-                if (task.commitNeeded()) {
-                    if (task.isActive()) {
-                        log.error("Active task {} was revoked and should have already been
committed", task.id());
-                        throw new IllegalStateException("Revoked active task was not committed
during handleRevocation");
-                    } else {
-                        task.prepareCommit();
-                        task.postCommit();
-                    }
+                if (!task.isActive()) {
+                    // Active tasks should have already been suspended and committed during
handleRevocation, but
+                    // standbys must be suspended/committed/closed all here
+                    task.suspend();
+                    task.prepareCommit();
+                    task.postCommit();
                 }
                 completeTaskCloseClean(task);
                 cleanUpTaskProducer(task, taskCloseExceptions);
@@ -437,17 +434,16 @@ public class TaskManager {
     void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);
 
-        final Set<Task> tasksToCommit = new HashSet<>();
+        final Set<Task> revokedTasks = new HashSet<>();
         final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask
= new HashMap<>();
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         for (final Task task : activeTaskIterable()) {
             if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
                 try {
                     task.suspend();
-                    if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
-                    }
+                    revokedTasks.add(task);
                 } catch (final RuntimeException e) {
                     log.error("Caught the following exception while trying to suspend revoked
task " + task.id(), e);
                     firstException.compareAndSet(null, new StreamsException("Failed to suspend
" + task.id(), e));
@@ -469,21 +465,20 @@ public class TaskManager {
             throw suspendException;
         }
 
+        prepareCommitAndAddOffsetsToMap(revokedTasks, consumedOffsetsAndMetadataPerTask);
+
         // If using eos-beta, if we must commit any task then we must commit all of them
         // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify
by always committing everything
-        if (processingMode ==  EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) {
-            tasksToCommit.addAll(additionalTasksForCommitting);
-        }
+        final boolean shouldCommitAdditionalTasks =
+            processingMode == EXACTLY_ONCE_BETA && !consumedOffsetsAndMetadataPerTask.isEmpty();
 
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask
= new HashMap<>();
-        for (final Task task : tasksToCommit) {
-            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-            consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+        if (shouldCommitAdditionalTasks) {
+            prepareCommitAndAddOffsetsToMap(additionalTasksForCommitting, consumedOffsetsAndMetadataPerTask);
         }
 
         commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
 
-        for (final Task task : tasksToCommit) {
+        for (final Task task : revokedTasks) {
             try {
                 task.postCommit();
             } catch (final RuntimeException e) {
@@ -492,9 +487,30 @@ public class TaskManager {
             }
         }
 
-        final RuntimeException commitException = firstException.get();
-        if (commitException != null) {
-            throw commitException;
+        if (shouldCommitAdditionalTasks) {
+            for (final Task task : additionalTasksForCommitting) {
+                try {
+                    task.postCommit();
+                } catch (final RuntimeException e) {
+                    log.error("Exception caught while post-committing task " + task.id(),
e);
+                    firstException.compareAndSet(null, e);
+                }
+            }
+        }
+
+        final RuntimeException postCommitException = firstException.get();
+        if (postCommitException != null) {
+            throw postCommitException;
+        }
+    }
+
+    private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
+                                                 final Map<TaskId, Map<TopicPartition,
OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
+        for (final Task task : tasksToPrepare) {
+            final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+            if (!committableOffsets.isEmpty()) {
+                consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+            }
         }
     }
 
@@ -723,17 +739,17 @@ public class TaskManager {
         if (!clean) {
             return activeTaskIterable();
         }
+        final Set<Task> tasksToCommit = new HashSet<>();
         final Set<Task> tasksToCloseDirty = new HashSet<>();
         final Set<Task> tasksToCloseClean = new HashSet<>();
-        final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask
= new HashMap<>();
 
         for (final Task task : activeTaskIterable()) {
             try {
                 task.suspend();
-                if (task.commitNeeded()) {
-                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets
= task.prepareCommit();
-                    tasksToCommit.add(task);
+                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                tasksToCommit.add(task);
+                if (!committableOffsets.isEmpty()) {
                     consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                 }
                 tasksToCloseClean.add(task);
@@ -754,7 +770,7 @@ public class TaskManager {
             try {
                 commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
 
-                for (final Task task : tasksToCommit) {
+                for (final Task task : activeTaskIterable()) {
                     try {
                         task.postCommit();
                     } catch (final RuntimeException e) {
@@ -794,17 +810,13 @@ public class TaskManager {
             return standbyTaskIterable();
         }
         final Set<Task> tasksToCloseDirty = new HashSet<>();
-        final Set<Task> tasksToCloseClean = new HashSet<>();
-        final Set<Task> tasksToCommit = new HashSet<>();
 
         for (final Task task : standbyTaskIterable()) {
             try {
                 task.suspend();
-                if (task.commitNeeded()) {
-                    task.prepareCommit();
-                    tasksToCommit.add(task);
-                }
-                tasksToCloseClean.add(task);
+                task.prepareCommit();
+                task.postCommit();
+                completeTaskCloseClean(task);
             } catch (final TaskMigratedException e) {
                 // just ignore the exception as it doesn't matter during shutdown
                 tasksToCloseDirty.add(task);
@@ -813,27 +825,6 @@ public class TaskManager {
                 tasksToCloseDirty.add(task);
             }
         }
-
-        for (final Task task : tasksToCommit) {
-            try {
-                task.postCommit();
-            } catch (final RuntimeException e) {
-                log.error("Exception caught while post-committing standby task " + task.id(),
e);
-                firstException.compareAndSet(null, e);
-                tasksToCloseDirty.add(task);
-                tasksToCloseClean.remove(task);
-            }
-        }
-
-        for (final Task task : tasksToCloseClean) {
-            try {
-                completeTaskCloseClean(task);
-            } catch (final RuntimeException e) {
-                log.error("Exception caught while clean-closing standby task " + task.id(),
e);
-                firstException.compareAndSet(null, e);
-                tasksToCloseDirty.add(task);
-            }
-        }
         return tasksToCloseDirty;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f98d630..ba07d55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -171,6 +171,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldTransitToRunningAfterInitialization() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject());
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
@@ -195,12 +196,15 @@ public class StandbyTaskTest {
     public void shouldThrowIfCommittingOnIllegalState() {
         EasyMock.replay(stateManager);
         task = createStandbyTask();
+        task.suspend();
+        task.closeClean();
 
         assertThrows(IllegalStateException.class, task::prepareCommit);
     }
 
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -230,6 +234,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldNotCommitAndThrowOnCloseDirty() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
         stateManager.flush();
@@ -255,6 +260,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
         EasyMock.replay(stateManager);
@@ -270,6 +276,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -311,7 +318,7 @@ public class StandbyTaskTest {
         EasyMock.expect(stateManager.changelogOffsets())
             .andReturn(Collections.singletonMap(partition, 50L))
             .andReturn(Collections.singletonMap(partition, 50L))
-            .andReturn(Collections.singletonMap(partition, 60L));
+            .andReturn(Collections.singletonMap(partition, 60L)).anyTimes();
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -321,22 +328,21 @@ public class StandbyTaskTest {
         task = createStandbyTask();
         task.initializeIfNeeded();
 
-        assertTrue(task.commitNeeded());
-
-        task.prepareCommit();
-        task.postCommit();
-
         // do not need to commit if there's no update
         assertFalse(task.commitNeeded());
 
         // could commit if the offset advanced
         assertTrue(task.commitNeeded());
 
+        task.prepareCommit();
+        task.postCommit();
+
         EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldThrowOnCloseCleanError() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
@@ -360,6 +366,9 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
+        EasyMock.expect(stateManager.changelogOffsets())
+            .andReturn(Collections.emptyMap())
+            .andReturn(Collections.singletonMap(partition, 0L));
         stateManager.checkpoint(EasyMock.anyObject());
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
@@ -385,6 +394,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldUnregisterMetricsInCloseClean() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
 
@@ -400,6 +410,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldUnregisterMetricsInCloseDirty() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
 
@@ -498,6 +509,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldRecycleTask() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.recycle();
         EasyMock.replay(stateManager);
 
@@ -528,6 +540,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldAlwaysSuspendRunningTasks() {
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         EasyMock.replay(stateManager);
         task = createStandbyTask();
         task.initializeIfNeeded();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9607470..facbf2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -82,7 +82,10 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
@@ -1203,32 +1206,6 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCommitWhenSuspending() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
10L)).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
10L)));
-        EasyMock.expectLastCall();
-        EasyMock.replay(recordCollector, stateDirectory, stateManager);
-
-        task = createStatefulTask(createConfig(false, "100"), true);
-
-        task.initializeIfNeeded();
-        task.completeRestoration();
-
-        task.suspend();
-        task.prepareCommit();
-        task.postCommit();
-
-        assertEquals(Task.State.SUSPENDED, task.state());
-        assertTrue(source1.closed);
-        assertTrue(source2.closed);
-
-        EasyMock.verify(stateManager);
-    }
-
-    @Test
     public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws
IOException {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true);
@@ -1469,7 +1446,6 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowIfCommittingOnIllegalState() {
         task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
-        assertThrows(IllegalStateException.class, task::prepareCommit);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1479,7 +1455,6 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowIfPostCommittingOnIllegalState() {
         task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
-        assertThrows(IllegalStateException.class, task::postCommit);
 
         task.transitionTo(Task.State.SUSPENDED);
         task.transitionTo(Task.State.CLOSED);
@@ -1487,6 +1462,61 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldSkipCheckpointingSuspendedCreatedTask() {
+        stateManager.checkpoint(EasyMock.anyObject());
+        EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.suspend();
+        task.postCommit();
+    }
+
+    @Test
+    public void shouldCheckpointWithEmptyOffsetsForSuspendedRestoringTask() {
+        stateManager.checkpoint(emptyMap());
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.initializeIfNeeded();
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void shouldCheckpointWithEmptyOffsetsForSuspendedRunningTaskWithNoCommitNeeded()
{
+        stateManager.checkpoint(emptyMap());
+        EasyMock.replay(stateManager);
+
+        task = createStatefulTask(createConfig(false, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void shouldCheckpointTheConsumedOffsetsForSuspendedRunningTaskWithCommitNeeded()
{
+        final Map<TopicPartition, Long> checkpointableOffsets = singletonMap(partition1,
0L);
+        stateManager.checkpoint(EasyMock.eq(checkpointableOffsets));
+        EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(false, "0"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration();
+        task.addRecords(partition1, singleton(getConsumerRecord(partition1, 10)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
+        task.suspend();
+        task.postCommit();
+        EasyMock.verify(stateManager, recordCollector);
+    }
+
+    @Test
     public void shouldReturnStateManagerChangelogOffsets() {
         EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1,
50L)).anyTimes();
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)).anyTimes();
@@ -1560,7 +1590,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotCommitOnCloseRestoring() {
+    public void shouldCheckpointOnCloseRestoring() {
         stateManager.flush();
         EasyMock.expectLastCall();
         stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
@@ -1584,30 +1614,33 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCommitOnCloseClean() {
+    public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() {
         final long offset = 543L;
+        final long consumedOffset = 345L;
 
         EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
offset)).anyTimes();
-        stateManager.close();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
offset)));
         EasyMock.expectLastCall();
+        stateManager.checkpoint(EasyMock.eq(mkMap(
+            mkEntry(changelogPartition, offset),
+            mkEntry(partition1, consumedOffset)
+        )));
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(recordCollector, stateManager);
         final MetricName metricName = setupCloseTaskMetric();
 
-        task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
+        task = createOptimizedStatefulTask(createConfig(false, "0"), consumer);
         task.initializeIfNeeded();
         task.completeRestoration();
+
+        task.addRecords(partition1, singletonList(getConsumerRecord(partition1, consumedOffset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         task.postCommit();
-        task.closeClean();
-
-        assertEquals(Task.State.CLOSED, task.state());
 
-        final double expectedCloseTaskMetric = 1.0;
-        verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
+        assertEquals(Task.State.SUSPENDED, task.state());
 
         EasyMock.verify(stateManager);
     }
@@ -1616,8 +1649,8 @@ public class StreamTaskTest {
     public void shouldSwallowExceptionOnCloseCleanError() {
         final long offset = 543L;
 
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
offset)).anyTimes();
-        stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
offset)));
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, offset)));
         EasyMock.expectLastCall();
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes();
         stateManager.close();
@@ -1629,6 +1662,10 @@ public class StreamTaskTest {
         task.initializeIfNeeded();
         task.completeRestoration();
 
+        task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         task.postCommit();
@@ -1679,9 +1716,8 @@ public class StreamTaskTest {
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
         final long offset = 543L;
-
-        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
offset));
-        stateManager.checkpoint(Collections.singletonMap(changelogPartition, offset));
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
+        stateManager.checkpoint(Collections.singletonMap(partition1, offset));
         EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
         stateManager.close();
         EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();
@@ -1692,6 +1728,10 @@ public class StreamTaskTest {
         task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
         task.initializeIfNeeded();
 
+        task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset)));
+        task.process(100L);
+        assertTrue(task.commitNeeded());
+
         task.suspend();
         task.prepareCommit();
         assertThrows(ProcessorStateException.class, () -> task.postCommit());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 23166d1..36730b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -79,6 +79,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.union;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.eq;
@@ -414,11 +415,10 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorSuspendingTask() {
+    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions,
true) {
             @Override
-            public void suspend() {
-                super.suspend();
+            public void closeClean() {
                 throw new RuntimeException("KABOOM!");
             }
         };
@@ -436,6 +436,7 @@ public class TaskManagerTest {
         replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
+        taskManager.handleRevocation(taskId00Partitions);
 
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
@@ -536,6 +537,8 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
+        taskManager.handleRevocation(taskId00Partitions);
+
         final RuntimeException thrown = assertThrows(
             RuntimeException.class,
             () -> taskManager.handleAssignment(emptyMap(), emptyMap())
@@ -1399,7 +1402,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseActiveTasksDirtyAndPropagateSuspendException() {
+    public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
         setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
 
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
@@ -1418,21 +1421,15 @@ public class TaskManagerTest {
         taskManager.tasks().put(taskId01, task01);
         taskManager.tasks().put(taskId02, task02);
 
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
-
         replay(activeTaskCreator);
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
-            () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions)),
Collections.emptyMap()));
+            () -> taskManager.handleRevocation(union(HashSet::new, taskId01Partitions,
taskId02Partitions)));
         assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!"));
 
         assertThat(task00.state(), is(Task.State.CREATED));
-        assertThat(task01.state(), is(Task.State.CLOSED));
-        assertThat(task02.state(), is(Task.State.CLOSED));
-
-        // All the tasks involving in the commit should already be removed.
-        assertThat(taskManager.tasks(), is(Collections.singletonMap(taskId00, task00)));
+        assertThat(task01.state(), is(Task.State.SUSPENDED));
+        assertThat(task02.state(), is(Task.State.SUSPENDED));
 
         verify(activeTaskCreator);
     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7615612..31659bb 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1175,6 +1175,8 @@ public class TopologyTestDriver implements Closeable {
     public void close() {
         if (task != null) {
             task.suspend();
+            task.prepareCommit();
+            task.postCommit();
             task.closeClean();
         }
         if (globalStateTask != null) {


Mime
View raw message