kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9441: Improve Kafka Streams task management (#8776)
Date Fri, 05 Jun 2020 17:51:15 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 6356e6f  KAFKA-9441: Improve Kafka Streams task management (#8776)
6356e6f is described below

commit 6356e6f2cf296ff7c45e778a493060acf136cd40
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Fri Jun 5 10:40:59 2020 -0700

    KAFKA-9441: Improve Kafka Streams task management (#8776)
    
     - make task manager agnostic to task state
     - make tasks state transitions idempotent
    
    Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../streams/processor/internals/StandbyTask.java   |  85 +++++----
 .../streams/processor/internals/StreamTask.java    | 210 ++++++++++++++-------
 .../kafka/streams/processor/internals/Task.java    |  12 +-
 .../streams/processor/internals/TaskManager.java   |  55 +++---
 .../processor/internals/StandbyTaskTest.java       |   2 +-
 .../processor/internals/StreamTaskTest.java        |  10 +-
 .../processor/internals/TaskManagerTest.java       |  17 +-
 7 files changed, 248 insertions(+), 143 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index cc922ab..8cba911 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -176,17 +176,25 @@ public class StandbyTask extends AbstractTask implements Task {
      * @throws StreamsException fatal error, should close the thread
      */
     private void prepareClose(final boolean clean) {
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, do nothing
-            return;
-        }
+        switch (state()) {
+            case CREATED:
+            case CLOSED:
+                log.trace("Skip prepare closing since state is {}", state());
+                return;
 
-        if (state() == State.RUNNING) {
-            if (clean) {
-                stateMgr.flush();
-            }
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+            case RUNNING:
+                if (clean) {
+                    stateMgr.flush();
+                }
+
+                break;
+
+            case RESTORING:
+            case SUSPENDED:
+                throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while closing
standby task " + id);
         }
     }
 
@@ -226,29 +234,42 @@ public class StandbyTask extends AbstractTask implements Task {
     }
 
     private void close(final boolean clean) {
-        if (state() == State.CREATED || state() == State.RUNNING) {
-            if (clean) {
-                // since there's no written offsets we can checkpoint with empty map,
-                // and the state current offset would be used to checkpoint
-                stateMgr.checkpoint(Collections.emptyMap());
-                offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
-            }
-            executeAndMaybeSwallow(
-                clean,
-                () -> StateManagerUtil.closeStateManager(
-                    log,
-                    logPrefix,
+        switch (state()) {
+            case CREATED:
+            case RUNNING:
+                if (clean) {
+                    // since there's no written offsets we can checkpoint with empty map,
+                    // and the state current offset would be used to checkpoint
+                    stateMgr.checkpoint(Collections.emptyMap());
+                    offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+                }
+                executeAndMaybeSwallow(
                     clean,
-                    eosEnabled,
-                    stateMgr,
-                    stateDirectory,
-                    TaskType.STANDBY
-                ),
-                "state manager close",
-                log
-            );
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+                    () -> StateManagerUtil.closeStateManager(
+                        log,
+                        logPrefix,
+                        clean,
+                        eosEnabled,
+                        stateMgr,
+                        stateDirectory,
+                        TaskType.STANDBY
+                    ),
+                    "state manager close",
+                    log
+                );
+
+                break;
+
+            case CLOSED:
+                log.trace("Skip closing since state is {}", state());
+                return;
+
+            case RESTORING:
+            case SUSPENDED:
+                throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while closing
standby task " + id);
         }
 
         closeTaskSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 89be8a5..d045144 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -218,17 +218,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      */
     @Override
     public void completeRestoration() {
-        if (state() == State.RESTORING) {
-            initializeMetadata();
-            initializeTopology();
-            processorContext.initialize();
-            idleStartTimeMs = RecordQueue.UNKNOWN;
+        switch (state()) {
+            case RUNNING:
+                return;
 
-            transitionTo(State.RUNNING);
+            case RESTORING:
+                initializeMetadata();
+                initializeTopology();
+                processorContext.initialize();
+                idleStartTimeMs = RecordQueue.UNKNOWN;
 
-            log.info("Restored and ready to run");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while completing
restoration for active task " + id);
+                transitionTo(State.RUNNING);
+
+                log.info("Restored and ready to run");
+
+                break;
+
+            case CREATED:
+            case SUSPENDED:
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while completing
restoration for active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while completing
restoration for active task " + id);
         }
     }
 
@@ -247,48 +259,75 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip prepare suspending since state is {}", state());
+
+                break;
 
-            stateMgr.flush();
-            recordCollector.flush();
+            case RESTORING:
+                stateMgr.flush();
+                log.info("Prepare suspending restoring");
+
+                break;
 
-            log.info("Prepare suspending running");
-        } else if (state() == State.RESTORING) {
-            stateMgr.flush();
+            case RUNNING:
+                closeTopology(true);
 
-            log.info("Prepare suspending restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.info("Prepare suspending running");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending
active task " + id);
         }
     }
 
     @Override
     public void suspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            stateMgr.checkpoint(checkpointableOffsets());
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended running");
-        } else if (state() == State.RESTORING) {
-            // we just checkpoint the position that we've restored up to without
-            // going through the commit process
-            stateMgr.checkpoint(emptyMap());
-
-            // we should also clear any buffered records of a task when suspending it
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip suspending since state is {}", state());
+
+                break;
+
+            case RUNNING:
+                stateMgr.checkpoint(checkpointableOffsets());
+                partitionGroup.clear();
+
+                transitionTo(State.SUSPENDED);
+                log.info("Suspended running");
+
+                break;
+
+            case RESTORING:
+                // we just checkpoint the position that we've restored up to without
+                // going through the commit process
+                stateMgr.checkpoint(emptyMap());
+
+                // we should also clear any buffered records of a task when suspending it
+                partitionGroup.clear();
+
+                transitionTo(State.SUSPENDED);
+                log.info("Suspended restoring");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending
active task " + id);
         }
     }
 
@@ -315,8 +354,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
                 break;
 
-            default:
+            case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while resuming
active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while resuming
active task " + id);
         }
     }
 
@@ -332,8 +374,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
                 break;
 
-            default:
+            case CREATED:
+            case SUSPENDED:
+            case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while preparing
active task " + id + " for committing");
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while preparing
active task " + id + " for committing");
         }
     }
 
@@ -362,8 +409,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
                 break;
 
-            default:
+            case CREATED:
+            case SUSPENDED:
+            case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while post
committing active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while post
committing active task " + id);
         }
     }
 
@@ -466,8 +518,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
-            default:
+
+            case CLOSED:
                 throw new IllegalStateException("Illegal state " + state() + " while closing
active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while closing
active task " + id);
         }
 
         partitionGroup.close();
@@ -493,28 +549,42 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint
file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush",
log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+            case CLOSED:
+                // not need to checkpoint, since when suspending we've already committed
the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already
committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare
closing active task " + id);
+
+                break;
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while prepare
closing active task " + id);
         }
 
         return checkpoint;
@@ -558,8 +628,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
                 break;
 
+            case CLOSED:
+                log.trace("Skip closing since state is {}", state());
+                return;
+
             default:
-                throw new IllegalStateException("Illegal state " + state() + " while closing
active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " while closing
active task " + id);
         }
 
         partitionGroup.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f95e476..eee290a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -39,7 +39,6 @@ public interface Task {
     long LATEST_OFFSET = -2L;
 
     /*
-     *
      * <pre>
      *                 +-------------+
      *          +<---- | Created (0) | <----------------------+
@@ -59,14 +58,9 @@ public interface Task {
      *          |            |                      |         |
      *          |            |                      |         |
      *          |            v                      |         |
-     *          |      +-----+-------+              |         |
-     *          +----> | Closing (4) | <------------+         |
-     *                 +-----+-------+                        |
-     *                       |                                |
-     *                       v                                |
-     *                 +-----+-------+                        |
-     *                 | Closed (5)  | -----------------------+
-     *                 +-------------+
+     *          |      +-----+-------+ <------------+         |
+     *          +----> | Closed (4)  |                        |
+     *                 +-------------+ <----------------------+
      * </pre>
      */
     enum State {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 15ffa4a..d1be8a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -58,9 +58,6 @@ import java.util.stream.Stream;
 
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
-import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
-import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
-import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
 
 public class TaskManager {
     // initialize the task list
@@ -388,30 +385,28 @@ public class TaskManager {
     boolean tryToCompleteRestoration() {
         boolean allRunning = true;
 
-        final List<Task> restoringTasks = new LinkedList<>();
+        final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads within the instance
that one thread
-                    // trying to grab the task from the other, while the other has not released
the lock since
-                    // it did not participate in the rebalance. In this case we can just
retry in the next iteration
-                    log.debug("Could not initialize {} due to the following exception; will
retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within the instance
that one thread
+                // trying to grab the task from the other, while the other has not released
the lock since
+                // it did not participate in the rebalance. In this case we can just retry
in the next iteration
+                log.debug("Could not initialize {} due to the following exception; will retry",
task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
-                restoringTasks.add(task);
+            if (task.isActive()) {
+                activeTasks.add(task);
             }
         }
 
-        if (allRunning && !restoringTasks.isEmpty()) {
+        if (allRunning && !activeTasks.isEmpty()) {
 
             final Set<TopicPartition> restored = changelogReader.completedChangelogs();
 
-            for (final Task task : restoringTasks) {
+            for (final Task task : activeTasks) {
                 if (restored.containsAll(task.changelogPartitions())) {
                     try {
                         task.completeRestoration();
@@ -529,11 +524,7 @@ public class TaskManager {
         for (final TaskId id : lockedTaskDirectories) {
             final Task task = tasks.get(id);
             if (task != null) {
-                if (task.isActive() && task.state() == RUNNING) {
-                    taskOffsetSums.put(id, Task.LATEST_OFFSET);
-                } else {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
-                }
+                taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
             } else {
                 final File checkpointFile = stateDirectory.checkpointFileFor(id);
                 try {
@@ -613,10 +604,20 @@ public class TaskManager {
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet())
{
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE",
id);
-                return Long.MAX_VALUE;
+
+            if (offset == Task.LATEST_OFFSET) { // this condition can only be true for active
tasks; never for standby
+                // for this case, the offset of all partitions is set to `LATEST_OFFSET`
+                // and we "forward" the sentinel value directly
+                return Task.LATEST_OFFSET;
+            } else {
+                if (offset < 0) {
+                    throw new IllegalStateException("Offset should not be negative.");
+                }
+                offsetSum += offset;
+                if (offsetSum < 0) {
+                    log.warn("Sum of changelog offsets for task {} overflowed, pinning to
Long.MAX_VALUE", id);
+                    return Long.MAX_VALUE;
+                }
             }
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index cbba51d..f868de4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -136,7 +136,7 @@ public class StandbyTaskTest {
 
     @After
     public void cleanup() throws IOException {
-        if (task != null && !task.isClosed()) {
+        if (task != null) {
             task.prepareCloseDirty();
             task.closeDirty();
             task = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 13d039b..a4431a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -217,7 +217,7 @@ public class StreamTaskTest {
 
     @After
     public void cleanup() throws IOException {
-        if (task != null && !task.isClosed()) {
+        if (task != null) {
             task.prepareCloseDirty();
             task.closeDirty();
             task = null;
@@ -1789,7 +1789,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldThrowIfClosingOnIllegalState() {
+    public void closeShouldBeIdempotent() {
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.replay(stateManager);
 
@@ -1798,9 +1798,9 @@ public class StreamTaskTest {
         final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
         task.closeClean(checkpoint);
 
-        // close call are not idempotent since we are already in closed
-        assertThrows(IllegalStateException.class, () -> task.closeClean(checkpoint));
-        assertThrows(IllegalStateException.class, task::closeDirty);
+        // close calls are idempotent since we are already in closed
+        task.closeClean(checkpoint);
+        task.closeDirty();
 
         EasyMock.reset(stateManager);
         EasyMock.replay(stateManager);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c7446e2..4d38b2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -253,6 +253,10 @@ public class TaskManagerTest {
 
     @Test
     public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET),
+            mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET)
+        );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Task.LATEST_OFFSET));
 
         expectLockObtainedFor(taskId00);
@@ -260,7 +264,12 @@ public class TaskManagerTest {
         replay(stateDirectory);
 
         taskManager.handleRebalanceStart(singleton("topic"));
-        handleAssignment(taskId00Assignment, emptyMap(), emptyMap());
+        final StateMachineTask runningTask = handleAssignment(
+            taskId00Assignment,
+            emptyMap(),
+            emptyMap()
+        ).get(taskId00);
+        runningTask.setChangelogOffsets(changelogOffsets);
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
     }
@@ -2637,11 +2646,17 @@ public class TaskManagerTest {
         public void initializeIfNeeded() {
             if (state() == State.CREATED) {
                 transitionTo(State.RESTORING);
+                if (!active) {
+                    transitionTo(State.RUNNING);
+                }
             }
         }
 
         @Override
         public void completeRestoration() {
+            if (state() == State.RUNNING) {
+                return;
+            }
             transitionTo(State.RUNNING);
         }
 


Mime
View raw message