This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 6356e6f KAFKA-9441: Improve Kafka Streams task management (#8776)
6356e6f is described below
commit 6356e6f2cf296ff7c45e778a493060acf136cd40
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Fri Jun 5 10:40:59 2020 -0700
KAFKA-9441: Improve Kafka Streams task management (#8776)
- make task manager agnostic to task state
- make tasks state transitions idempotent
Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
.../streams/processor/internals/StandbyTask.java | 85 +++++----
.../streams/processor/internals/StreamTask.java | 210 ++++++++++++++-------
.../kafka/streams/processor/internals/Task.java | 12 +-
.../streams/processor/internals/TaskManager.java | 55 +++---
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StreamTaskTest.java | 10 +-
.../processor/internals/TaskManagerTest.java | 17 +-
7 files changed, 248 insertions(+), 143 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index cc922ab..8cba911 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -176,17 +176,25 @@ public class StandbyTask extends AbstractTask implements Task {
* @throws StreamsException fatal error, should close the thread
*/
private void prepareClose(final boolean clean) {
- if (state() == State.CREATED) {
- // the task is created and not initialized, do nothing
- return;
- }
+ switch (state()) {
+ case CREATED:
+ case CLOSED:
+ log.trace("Skip prepare closing since state is {}", state());
+ return;
- if (state() == State.RUNNING) {
- if (clean) {
- stateMgr.flush();
- }
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+ case RUNNING:
+ if (clean) {
+ stateMgr.flush();
+ }
+
+ break;
+
+ case RESTORING:
+ case SUSPENDED:
+ throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while closing
standby task " + id);
}
}
@@ -226,29 +234,42 @@ public class StandbyTask extends AbstractTask implements Task {
}
private void close(final boolean clean) {
- if (state() == State.CREATED || state() == State.RUNNING) {
- if (clean) {
- // since there's no written offsets we can checkpoint with empty map,
- // and the state current offset would be used to checkpoint
- stateMgr.checkpoint(Collections.emptyMap());
- offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
- }
- executeAndMaybeSwallow(
- clean,
- () -> StateManagerUtil.closeStateManager(
- log,
- logPrefix,
+ switch (state()) {
+ case CREATED:
+ case RUNNING:
+ if (clean) {
+ // since there's no written offsets we can checkpoint with empty map,
+ // and the state current offset would be used to checkpoint
+ stateMgr.checkpoint(Collections.emptyMap());
+ offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
+ }
+ executeAndMaybeSwallow(
clean,
- eosEnabled,
- stateMgr,
- stateDirectory,
- TaskType.STANDBY
- ),
- "state manager close",
- log
- );
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+ () -> StateManagerUtil.closeStateManager(
+ log,
+ logPrefix,
+ clean,
+ eosEnabled,
+ stateMgr,
+ stateDirectory,
+ TaskType.STANDBY
+ ),
+ "state manager close",
+ log
+ );
+
+ break;
+
+ case CLOSED:
+ log.trace("Skip closing since state is {}", state());
+ return;
+
+ case RESTORING:
+ case SUSPENDED:
+ throw new IllegalStateException("Illegal state " + state() + " while closing
standby task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while closing
standby task " + id);
}
closeTaskSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 89be8a5..d045144 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -218,17 +218,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
*/
@Override
public void completeRestoration() {
- if (state() == State.RESTORING) {
- initializeMetadata();
- initializeTopology();
- processorContext.initialize();
- idleStartTimeMs = RecordQueue.UNKNOWN;
+ switch (state()) {
+ case RUNNING:
+ return;
- transitionTo(State.RUNNING);
+ case RESTORING:
+ initializeMetadata();
+ initializeTopology();
+ processorContext.initialize();
+ idleStartTimeMs = RecordQueue.UNKNOWN;
- log.info("Restored and ready to run");
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while completing
restoration for active task " + id);
+ transitionTo(State.RUNNING);
+
+ log.info("Restored and ready to run");
+
+ break;
+
+ case CREATED:
+ case SUSPENDED:
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while completing
restoration for active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while completing
restoration for active task " + id);
}
}
@@ -247,48 +259,75 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
*/
@Override
public void prepareSuspend() {
- if (state() == State.CREATED || state() == State.SUSPENDED) {
- // do nothing
- log.trace("Skip prepare suspending since state is {}", state());
- } else if (state() == State.RUNNING) {
- closeTopology(true);
+ switch (state()) {
+ case CREATED:
+ case SUSPENDED:
+ // do nothing
+ log.trace("Skip prepare suspending since state is {}", state());
+
+ break;
- stateMgr.flush();
- recordCollector.flush();
+ case RESTORING:
+ stateMgr.flush();
+ log.info("Prepare suspending restoring");
+
+ break;
- log.info("Prepare suspending running");
- } else if (state() == State.RESTORING) {
- stateMgr.flush();
+ case RUNNING:
+ closeTopology(true);
- log.info("Prepare suspending restoring");
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+ stateMgr.flush();
+ recordCollector.flush();
+
+ log.info("Prepare suspending running");
+
+ break;
+
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while suspending
active task " + id);
}
}
@Override
public void suspend() {
- if (state() == State.CREATED || state() == State.SUSPENDED) {
- // do nothing
- log.trace("Skip suspending since state is {}", state());
- } else if (state() == State.RUNNING) {
- stateMgr.checkpoint(checkpointableOffsets());
- partitionGroup.clear();
-
- transitionTo(State.SUSPENDED);
- log.info("Suspended running");
- } else if (state() == State.RESTORING) {
- // we just checkpoint the position that we've restored up to without
- // going through the commit process
- stateMgr.checkpoint(emptyMap());
-
- // we should also clear any buffered records of a task when suspending it
- partitionGroup.clear();
-
- transitionTo(State.SUSPENDED);
- log.info("Suspended restoring");
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+ switch (state()) {
+ case CREATED:
+ case SUSPENDED:
+ // do nothing
+ log.trace("Skip suspending since state is {}", state());
+
+ break;
+
+ case RUNNING:
+ stateMgr.checkpoint(checkpointableOffsets());
+ partitionGroup.clear();
+
+ transitionTo(State.SUSPENDED);
+ log.info("Suspended running");
+
+ break;
+
+ case RESTORING:
+ // we just checkpoint the position that we've restored up to without
+ // going through the commit process
+ stateMgr.checkpoint(emptyMap());
+
+ // we should also clear any buffered records of a task when suspending it
+ partitionGroup.clear();
+
+ transitionTo(State.SUSPENDED);
+ log.info("Suspended restoring");
+
+ break;
+
+ case CLOSED:
+ throw new IllegalStateException("Illegal state " + state() + " while suspending
active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while suspending
active task " + id);
}
}
@@ -315,8 +354,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
break;
- default:
+ case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while resuming
active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while resuming
active task " + id);
}
}
@@ -332,8 +374,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
break;
- default:
+ case CREATED:
+ case SUSPENDED:
+ case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while preparing
active task " + id + " for committing");
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while preparing
active task " + id + " for committing");
}
}
@@ -362,8 +409,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
break;
- default:
+ case CREATED:
+ case SUSPENDED:
+ case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while post
committing active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while post
committing active task " + id);
}
}
@@ -466,8 +518,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
stateMgr.recycle();
recordCollector.close();
break;
- default:
+
+ case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while closing
active task " + id);
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while closing
active task " + id);
}
partitionGroup.close();
@@ -493,28 +549,42 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private Map<TopicPartition, Long> prepareClose(final boolean clean) {
final Map<TopicPartition, Long> checkpoint;
- if (state() == State.CREATED) {
- // the task is created and not initialized, just re-write the checkpoint file
- checkpoint = Collections.emptyMap();
- } else if (state() == State.RUNNING) {
- closeTopology(clean);
+ switch (state()) {
+ case CREATED:
+ // the task is created and not initialized, just re-write the checkpoint
file
+ checkpoint = Collections.emptyMap();
- if (clean) {
- stateMgr.flush();
- recordCollector.flush();
- checkpoint = checkpointableOffsets();
- } else {
+ break;
+
+ case RUNNING:
+ closeTopology(clean);
+
+ if (clean) {
+ stateMgr.flush();
+ recordCollector.flush();
+ checkpoint = checkpointableOffsets();
+ } else {
+ checkpoint = null; // `null` indicates to not write a checkpoint
+ executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush",
log);
+ }
+
+ break;
+
+ case RESTORING:
+ executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+ checkpoint = Collections.emptyMap();
+
+ break;
+
+ case SUSPENDED:
+ case CLOSED:
+ // not need to checkpoint, since when suspending we've already committed
the state
checkpoint = null; // `null` indicates to not write a checkpoint
- executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
- }
- } else if (state() == State.RESTORING) {
- executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
- checkpoint = Collections.emptyMap();
- } else if (state() == State.SUSPENDED) {
- // if `SUSPENDED` do not need to checkpoint, since when suspending we've already
committed the state
- checkpoint = null; // `null` indicates to not write a checkpoint
- } else {
- throw new IllegalStateException("Illegal state " + state() + " while prepare
closing active task " + id);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown state " + state() + " while prepare
closing active task " + id);
}
return checkpoint;
@@ -558,8 +628,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
break;
+ case CLOSED:
+ log.trace("Skip closing since state is {}", state());
+ return;
+
default:
- throw new IllegalStateException("Illegal state " + state() + " while closing
active task " + id);
+ throw new IllegalStateException("Unknown state " + state() + " while closing
active task " + id);
}
partitionGroup.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f95e476..eee290a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -39,7 +39,6 @@ public interface Task {
long LATEST_OFFSET = -2L;
/*
- *
* <pre>
* +-------------+
* +<---- | Created (0) | <----------------------+
@@ -59,14 +58,9 @@ public interface Task {
* | | | |
* | | | |
* | v | |
- * | +-----+-------+ | |
- * +----> | Closing (4) | <------------+ |
- * +-----+-------+ |
- * | |
- * v |
- * +-----+-------+ |
- * | Closed (5) | -----------------------+
- * +-------------+
+ * | +-----+-------+ <------------+ |
+ * +----> | Closed (4) | |
+ * +-------------+ <----------------------+
* </pre>
*/
enum State {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 15ffa4a..d1be8a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -58,9 +58,6 @@ import java.util.stream.Stream;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
-import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
-import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
-import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
public class TaskManager {
// initialize the task list
@@ -388,30 +385,28 @@ public class TaskManager {
boolean tryToCompleteRestoration() {
boolean allRunning = true;
- final List<Task> restoringTasks = new LinkedList<>();
+ final List<Task> activeTasks = new LinkedList<>();
for (final Task task : tasks.values()) {
- if (task.state() == CREATED) {
- try {
- task.initializeIfNeeded();
- } catch (final LockException | TimeoutException e) {
- // it is possible that if there are multiple threads within the instance
that one thread
- // trying to grab the task from the other, while the other has not released
the lock since
- // it did not participate in the rebalance. In this case we can just
retry in the next iteration
- log.debug("Could not initialize {} due to the following exception; will
retry", task.id(), e);
- allRunning = false;
- }
+ try {
+ task.initializeIfNeeded();
+ } catch (final LockException | TimeoutException e) {
+ // it is possible that if there are multiple threads within the instance
that one thread
+ // trying to grab the task from the other, while the other has not released
the lock since
+ // it did not participate in the rebalance. In this case we can just retry
in the next iteration
+ log.debug("Could not initialize {} due to the following exception; will retry",
task.id(), e);
+ allRunning = false;
}
- if (task.state() == RESTORING) {
- restoringTasks.add(task);
+ if (task.isActive()) {
+ activeTasks.add(task);
}
}
- if (allRunning && !restoringTasks.isEmpty()) {
+ if (allRunning && !activeTasks.isEmpty()) {
final Set<TopicPartition> restored = changelogReader.completedChangelogs();
- for (final Task task : restoringTasks) {
+ for (final Task task : activeTasks) {
if (restored.containsAll(task.changelogPartitions())) {
try {
task.completeRestoration();
@@ -529,11 +524,7 @@ public class TaskManager {
for (final TaskId id : lockedTaskDirectories) {
final Task task = tasks.get(id);
if (task != null) {
- if (task.isActive() && task.state() == RUNNING) {
- taskOffsetSums.put(id, Task.LATEST_OFFSET);
- } else {
- taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
- }
+ taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
} else {
final File checkpointFile = stateDirectory.checkpointFileFor(id);
try {
@@ -613,10 +604,20 @@ public class TaskManager {
for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet())
{
final long offset = changelogEntry.getValue();
- offsetSum += offset;
- if (offsetSum < 0) {
- log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE",
id);
- return Long.MAX_VALUE;
+
+ if (offset == Task.LATEST_OFFSET) { // this condition can only be true for active
tasks; never for standby
+ // for this case, the offset of all partitions is set to `LATEST_OFFSET`
+ // and we "forward" the sentinel value directly
+ return Task.LATEST_OFFSET;
+ } else {
+ if (offset < 0) {
+ throw new IllegalStateException("Offset should not be negative.");
+ }
+ offsetSum += offset;
+ if (offsetSum < 0) {
+ log.warn("Sum of changelog offsets for task {} overflowed, pinning to
Long.MAX_VALUE", id);
+ return Long.MAX_VALUE;
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index cbba51d..f868de4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -136,7 +136,7 @@ public class StandbyTaskTest {
@After
public void cleanup() throws IOException {
- if (task != null && !task.isClosed()) {
+ if (task != null) {
task.prepareCloseDirty();
task.closeDirty();
task = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 13d039b..a4431a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -217,7 +217,7 @@ public class StreamTaskTest {
@After
public void cleanup() throws IOException {
- if (task != null && !task.isClosed()) {
+ if (task != null) {
task.prepareCloseDirty();
task.closeDirty();
task = null;
@@ -1789,7 +1789,7 @@ public class StreamTaskTest {
}
@Test
- public void shouldThrowIfClosingOnIllegalState() {
+ public void closeShouldBeIdempotent() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.replay(stateManager);
@@ -1798,9 +1798,9 @@ public class StreamTaskTest {
final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
task.closeClean(checkpoint);
- // close call are not idempotent since we are already in closed
- assertThrows(IllegalStateException.class, () -> task.closeClean(checkpoint));
- assertThrows(IllegalStateException.class, task::closeDirty);
+ // close calls are idempotent since we are already in closed
+ task.closeClean(checkpoint);
+ task.closeDirty();
EasyMock.reset(stateManager);
EasyMock.replay(stateManager);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c7446e2..4d38b2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -253,6 +253,10 @@ public class TaskManagerTest {
@Test
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
+ final Map<TopicPartition, Long> changelogOffsets = mkMap(
+ mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET),
+ mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET)
+ );
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Task.LATEST_OFFSET));
expectLockObtainedFor(taskId00);
@@ -260,7 +264,12 @@ public class TaskManagerTest {
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
- handleAssignment(taskId00Assignment, emptyMap(), emptyMap());
+ final StateMachineTask runningTask = handleAssignment(
+ taskId00Assignment,
+ emptyMap(),
+ emptyMap()
+ ).get(taskId00);
+ runningTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@@ -2637,11 +2646,17 @@ public class TaskManagerTest {
public void initializeIfNeeded() {
if (state() == State.CREATED) {
transitionTo(State.RESTORING);
+ if (!active) {
+ transitionTo(State.RUNNING);
+ }
}
}
@Override
public void completeRestoration() {
+ if (state() == State.RUNNING) {
+ return;
+ }
transitionTo(State.RUNNING);
}
|