From commits-return-14745-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Sun Jun 7 00:44:38 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 4D5371928C for ; Sun, 7 Jun 2020 00:44:38 +0000 (UTC) Received: (qmail 12794 invoked by uid 500); 7 Jun 2020 00:44:37 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 12764 invoked by uid 500); 7 Jun 2020 00:44:37 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 12755 invoked by uid 99); 7 Jun 2020 00:44:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 07 Jun 2020 00:44:37 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1F170814A0; Sun, 7 Jun 2020 00:44:37 +0000 (UTC) Date: Sun, 07 Jun 2020 00:44:33 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.6 updated: KAFKA-10097: Internalize checkpoint data (#8820) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159149067063.6875.2687151405846784992@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.6 X-Git-Reftype: branch X-Git-Oldrev: 775d1b3bee1efcd56e3d118d4bb124dd9532ef7c X-Git-Newrev: e134bb43f9546f129f7392bab2109aae34308de9 X-Git-Rev: e134bb43f9546f129f7392bab2109aae34308de9 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new e134bb4 KAFKA-10097: Internalize checkpoint data (#8820) e134bb4 is described below commit e134bb43f9546f129f7392bab2109aae34308de9 Author: Boyang Chen AuthorDate: Sat Jun 6 17:34:03 2020 -0700 KAFKA-10097: Internalize checkpoint data (#8820) Reviewers: Matthias J. Sax , Guozhang Wang --- .../streams/processor/internals/StandbyTask.java | 8 +--- .../streams/processor/internals/StreamTask.java | 54 ++++++++++++---------- .../kafka/streams/processor/internals/Task.java | 4 +- .../streams/processor/internals/TaskManager.java | 33 ++++++------- .../processor/internals/StandbyTaskTest.java | 9 ++-- .../processor/internals/StreamTaskTest.java | 26 +++++------ .../processor/internals/TaskManagerTest.java | 28 ++++++----- .../apache/kafka/streams/TopologyTestDriver.java | 4 +- 8 files changed, 80 insertions(+), 86 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 8cba911..41598d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -33,7 +33,6 @@ import org.slf4j.Logger; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.Set; /** @@ -153,12 +152,10 @@ public class StandbyTask extends AbstractTask implements Task { } @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { prepareClose(true); log.info("Prepared clean close"); - - return Collections.emptyMap(); } @Override @@ -199,8 +196,7 @@ public class StandbyTask extends AbstractTask implements Task { } @Override - public void closeClean(final Map checkpoint) { - Objects.requireNonNull(checkpoint); + public void closeClean() { close(true); log.info("Closed clean"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d045144..b23a1a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -107,6 +107,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private boolean commitNeeded = false; private boolean commitRequested = false; + private Map checkpoint = null; + public StreamTask(final TaskId id, final Set partitions, final ProcessorTopology topology, @@ -465,17 +467,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } @Override - public Map prepareCloseClean() { - final Map checkpoint = prepareClose(true); + public void prepareCloseClean() { + prepareClose(true); log.info("Prepared clean close"); - - return checkpoint; } @Override - public void closeClean(final Map checkpoint) { - close(true, checkpoint); + public void closeClean() { + close(true); log.info("Closed clean"); } @@ -489,7 +489,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @Override public void closeDirty() { - close(false, null); + + close(false); log.info("Closed dirty"); } @@ -505,11 +506,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @Override public void closeAndRecycleState() { - final Map checkpoint = prepareClose(true); + prepareClose(true); + + writeCheckpointIfNeed(); - if (checkpoint != null) { - stateMgr.checkpoint(checkpoint); - } switch (state()) { case CREATED: case RUNNING: @@ -546,14 +546,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * otherwise, just close open resources * @throws TaskMigratedException if the task producer got fenced (EOS) */ - private Map prepareClose(final boolean clean) { - final Map checkpoint; + private void prepareClose(final boolean clean) { + // Reset any previously scheduled checkpoint. + checkpoint = null; switch (state()) { case CREATED: // the task is created and not initialized, just re-write the checkpoint file - checkpoint = Collections.emptyMap(); - + scheduleCheckpoint(emptyMap()); break; case RUNNING: @@ -562,9 +562,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, if (clean) { stateMgr.flush(); recordCollector.flush(); - checkpoint = checkpointableOffsets(); + scheduleCheckpoint(checkpointableOffsets()); } else { - checkpoint = null; // `null` indicates to not write a checkpoint executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); } @@ -572,22 +571,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, case RESTORING: executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); - checkpoint = Collections.emptyMap(); + scheduleCheckpoint(emptyMap()); break; case SUSPENDED: case CLOSED: // not need to checkpoint, since when suspending we've already committed the state - checkpoint = null; // `null` indicates to not write a checkpoint - break; default: throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id); } + } + + private void scheduleCheckpoint(final Map checkpoint) { + this.checkpoint = checkpoint; + } - return checkpoint; + private void writeCheckpointIfNeed() { + if (checkpoint != null) { + stateMgr.checkpoint(checkpoint); + checkpoint = null; + } } /** @@ -598,9 +604,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * 3. finally release the state manager lock * */ - private void close(final boolean clean, final Map checkpoint) { - if (clean && checkpoint != null) { - executeAndMaybeSwallow(clean, () -> stateMgr.checkpoint(checkpoint), "state manager checkpoint", log); + private void close(final boolean clean) { + if (clean) { + executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log); } switch (state()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index eee290a..9283e86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -145,12 +145,12 @@ public interface Task { * * @throws StreamsException fatal error, should close the thread */ - Map prepareCloseClean(); + void prepareCloseClean(); /** * Must be idempotent. */ - void closeClean(final Map checkpoint); + void closeClean(); /** * Prepare to close a task that we may not own. Discard any uncommitted progress and close the task. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 5c55093..12361d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -190,7 +190,7 @@ public class TaskManager { // first rectify all existing tasks final LinkedHashMap taskCloseExceptions = new LinkedHashMap<>(); - final Map> checkpointPerTask = new HashMap<>(); + final Set tasksToClose = new HashSet<>(); final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); final Set additionalTasksForCommitting = new HashSet<>(); final Set dirtyTasks = new HashSet<>(); @@ -210,11 +210,11 @@ public class TaskManager { tasksToRecycle.add(task); } else { try { - final Map checkpoint = task.prepareCloseClean(); + task.prepareCloseClean(); final Map committableOffsets = task .committableOffsetsAndMetadata(); - checkpointPerTask.put(task, checkpoint); + tasksToClose.add(task); if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); } @@ -250,20 +250,17 @@ public class TaskManager { log.error("Failed to batch commit tasks, " + "will close all tasks involved in this commit as dirty by the end", e); dirtyTasks.addAll(additionalTasksForCommitting); - dirtyTasks.addAll(checkpointPerTask.keySet()); + dirtyTasks.addAll(tasksToClose); - checkpointPerTask.clear(); + tasksToClose.clear(); // Just add first taskId to re-throw by the end. taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e); } } - for (final Map.Entry> taskAndCheckpoint : checkpointPerTask.entrySet()) { - final Task task = taskAndCheckpoint.getKey(); - final Map checkpoint = taskAndCheckpoint.getValue(); - + for (final Task task : tasksToClose) { try { - completeTaskCloseClean(task, checkpoint); + completeTaskCloseClean(task); cleanUpTaskProducer(task, taskCloseExceptions); tasks.remove(task.id()); } catch (final RuntimeException e) { @@ -631,9 +628,9 @@ public class TaskManager { task.closeDirty(); } - private void completeTaskCloseClean(final Task task, final Map checkpoint) { + private void completeTaskCloseClean(final Task task) { cleanupTask(task); - task.closeClean(checkpoint); + task.closeClean(); } // Note: this MUST be called *before* actually closing the task @@ -652,16 +649,16 @@ public class TaskManager { void shutdown(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); - final Map> checkpointPerTask = new HashMap<>(); + final Set tasksToClose = new HashSet<>(); final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); for (final Task task : tasks.values()) { if (clean) { try { - final Map checkpoint = task.prepareCloseClean(); + task.prepareCloseClean(); final Map committableOffsets = task.committableOffsetsAndMetadata(); - checkpointPerTask.put(task, checkpoint); + tasksToClose.add(task); if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); } @@ -681,11 +678,9 @@ public class TaskManager { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); } - for (final Map.Entry> taskAndCheckpoint : checkpointPerTask.entrySet()) { - final Task task = taskAndCheckpoint.getKey(); - final Map checkpoint = taskAndCheckpoint.getValue(); + for (final Task task : tasksToClose) { try { - completeTaskCloseClean(task, checkpoint); + completeTaskCloseClean(task); } catch (final RuntimeException e) { firstException.compareAndSet(null, e); closeTaskDirty(task); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index f868de4..04ef951 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -52,7 +52,6 @@ import org.junit.runner.RunWith; import java.io.File; import java.io.IOException; import java.util.Collections; -import java.util.Map; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -270,8 +269,8 @@ public class StandbyTaskTest { task = createStandbyTask(); task.initializeIfNeeded(); - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); assertEquals(Task.State.CLOSED, task.state()); @@ -323,8 +322,8 @@ public class StandbyTaskTest { task = createStandbyTask(); task.initializeIfNeeded(); - final Map checkpoint = task.prepareCloseClean(); - assertThrows(RuntimeException.class, () -> task.closeClean(checkpoint)); + task.prepareCloseClean(); + assertThrows(RuntimeException.class, () -> task.closeClean()); final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a4431a2..27135f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1567,8 +1567,8 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); assertEquals(Task.State.CLOSED, task.state()); assertFalse(source1.initialized); @@ -1642,8 +1642,8 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); assertEquals(Task.State.CLOSED, task.state()); @@ -1668,8 +1668,8 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); task.completeRestoration(); - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); assertEquals(Task.State.CLOSED, task.state()); @@ -1696,8 +1696,8 @@ public class StreamTaskTest { task.initializeIfNeeded(); task.completeRestoration(); - final Map checkpoint = task.prepareCloseClean(); - assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint)); + task.prepareCloseClean(); + assertThrows(ProcessorStateException.class, () -> task.closeClean()); final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); @@ -1760,8 +1760,8 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); - final Map checkpoint = task.prepareCloseClean(); - assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint)); + task.prepareCloseClean(); + assertThrows(ProcessorStateException.class, () -> task.closeClean()); assertEquals(Task.State.RESTORING, task.state()); @@ -1795,11 +1795,11 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); // close calls are idempotent since we are already in closed - task.closeClean(checkpoint); + task.closeClean(); task.closeDirty(); EasyMock.reset(stateManager); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7f31d7e..200d841 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -604,7 +604,7 @@ public class TaskManagerTest { final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new RuntimeException("oops"); } }; @@ -1115,7 +1115,7 @@ public class TaskManagerTest { final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false); final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("migrated", new RuntimeException("cause")); } @@ -1133,7 +1133,7 @@ public class TaskManagerTest { }; final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new RuntimeException("oops"); } @@ -1451,13 +1451,13 @@ public class TaskManagerTest { }; final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("migrated", new RuntimeException("cause")); } }; final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new RuntimeException("oops"); } }; @@ -2273,14 +2273,14 @@ public class TaskManagerTest { public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() { final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("t1 close exception", new RuntimeException()); } }; final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("t2 close exception", new RuntimeException()); } }; @@ -2303,14 +2303,14 @@ public class TaskManagerTest { public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() { final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("t1 close exception", new RuntimeException()); } }; final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new IllegalStateException("t2 illegal state exception", new RuntimeException()); } }; @@ -2332,14 +2332,14 @@ public class TaskManagerTest { public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() { final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new TaskMigratedException("t1 close exception", new RuntimeException()); } }; final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) { @Override - public Map prepareCloseClean() { + public void prepareCloseClean() { throw new KafkaException("Kaboom for t2!", new RuntimeException()); } }; @@ -2703,15 +2703,13 @@ public class TaskManagerTest { } @Override - public Map prepareCloseClean() { - return Collections.emptyMap(); - } + public void prepareCloseClean() {} @Override public void prepareCloseDirty() {} @Override - public void closeClean(final Map checkpoint) { + public void closeClean() { transitionTo(State.CLOSED); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index db31012..3416ec0 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1180,8 +1180,8 @@ public class TopologyTestDriver implements Closeable { */ public void close() { if (task != null) { - final Map checkpoint = task.prepareCloseClean(); - task.closeClean(checkpoint); + task.prepareCloseClean(); + task.closeClean(); } if (globalStateTask != null) { try {