kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9793: Expand the try-catch for task commit in HandleAssignment (#8402)
Date Mon, 06 Apr 2020 05:50:53 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 712ac52  KAFKA-9793: Expand the try-catch for task commit in HandleAssignment (#8402)
712ac52 is described below

commit 712ac5203e6ce199bed8804f6173acf33d79a173
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Sun Apr 5 22:50:26 2020 -0700

    KAFKA-9793: Expand the try-catch for task commit in HandleAssignment (#8402)
    
    As title suggests, we would like to broaden this check so that we don't fail to close
a doom-to-cleanup task.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/processor/internals/TaskManager.java   | 20 ++---
 .../processor/internals/TaskManagerTest.java       | 90 ++++++++++++++--------
 2 files changed, 69 insertions(+), 41 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a0a238c..22222cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -232,24 +232,26 @@ public class TaskManager {
         }
 
         if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            for (final Task task : additionalTasksForCommitting) {
-                task.prepareCommit();
-                final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.committableOffsetsAndMetadata();
-                if (!committableOffsets.isEmpty()) {
-                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+            try {
+                for (final Task task : additionalTasksForCommitting) {
+                    task.prepareCommit();
+                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets
= task.committableOffsetsAndMetadata();
+                    if (!committableOffsets.isEmpty()) {
+                        consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
+                    }
                 }
-            }
 
-            try {
                 commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
 
                 for (final Task task : additionalTasksForCommitting) {
                     task.postCommit();
                 }
             } catch (final RuntimeException e) {
-                log.error("Failed to commit tasks that are " +
-                    "prepared to close clean, will close them as dirty instead", e);
+                log.error("Failed to batch commit tasks, " +
+                    "will close all tasks involved in this commit as dirty by the end", e);
+                dirtyTasks.addAll(additionalTasksForCommitting);
                 dirtyTasks.addAll(checkpointPerTask.keySet());
+
                 checkpointPerTask.clear();
                 // Just add first taskId to re-throw by the end.
                 taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(),
e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 67b2ec8..d4769d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -154,7 +154,12 @@ public class TaskManagerTest {
 
     @Before
     public void setUp() {
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId",
StreamsConfig.METRICS_LATEST);
+        setUpTaskManager(StreamThread.ProcessingMode.AT_LEAST_ONCE);
+    }
+
+    private void setUpTaskManager(final StreamThread.ProcessingMode processingMode) {
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
         taskManager = new TaskManager(
             changeLogReader,
             UUID.randomUUID(),
@@ -165,7 +170,7 @@ public class TaskManagerTest {
             topologyBuilder,
             adminClient,
             stateDirectory,
-            StreamThread.ProcessingMode.AT_LEAST_ONCE
+            processingMode
         );
         taskManager.setMainConsumer(consumer);
     }
@@ -1229,22 +1234,56 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldCloseActiveTasksDirtyAndPropagatePrepareCommitException() {
+        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
+
+        final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
+
+        final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions,
true) {
+            @Override
+            public void prepareCommit() {
+                throw new RuntimeException("task 0_1 prepare commit boom!");
+            }
+        };
+
+        task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L,
null)));
+        task01.setCommitNeeded();
+
+        final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions,
true);
+        final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2,
new OffsetAndMetadata(1L, null));
+
+        task02.setCommittableOffsetsAndMetadata(offsetsT02);
+        task02.setCommitNeeded();
+
+        taskManager.tasks().put(taskId00, task00);
+        taskManager.tasks().put(taskId01, task01);
+        taskManager.tasks().put(taskId02, task02);
+
+        checkOrder(activeTaskCreator, false);
+
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
+        expectLastCall();
+
+        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02);
+        expectLastCall();
+
+        replay(activeTaskCreator);
+
+        final RuntimeException thrown = assertThrows(RuntimeException.class,
+            () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions),
+                mkEntry(taskId01, taskId01Partitions)), Collections.emptyMap()));
+        assertThat(thrown.getCause().getMessage(), is("task 0_1 prepare commit boom!"));
+
+        assertThat(task00.state(), is(Task.State.CREATED));
+        assertThat(task01.state(), is(Task.State.CLOSED));
+        assertThat(task02.state(), is(Task.State.CLOSED));
+
+        verify(activeTaskCreator);
+    }
+
+    @Test
     public void shouldCloseActiveTasksDirtyAndPropagateCommitException() {
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
-            new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
-        taskManager = new TaskManager(
-            changeLogReader,
-            UUID.randomUUID(),
-            "taskManagerTest",
-            streamsMetrics,
-            activeTaskCreator,
-            standbyTaskCreator,
-            topologyBuilder,
-            adminClient,
-            stateDirectory,
-            StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA
-        );
-        taskManager.setMainConsumer(consumer);
+        setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);
 
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
 
@@ -1252,7 +1291,6 @@ public class TaskManagerTest {
         task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L,
null)));
         task01.setCommitNeeded();
 
-
         final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions,
true);
         final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2,
new OffsetAndMetadata(1L, null));
 
@@ -1266,6 +1304,7 @@ public class TaskManagerTest {
         expect(activeTaskCreator.streamsProducerForTask(taskId01)).andThrow(new RuntimeException("task
0_1 producer boom!"));
 
         checkOrder(activeTaskCreator, false);
+
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
         expectLastCall();
 
@@ -1575,20 +1614,7 @@ public class TaskManagerTest {
                                                      final StreamsProducer producer,
                                                      final Map<TopicPartition, OffsetAndMetadata>
offsetsT01,
                                                      final Map<TopicPartition, OffsetAndMetadata>
offsetsT02) {
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId",
StreamsConfig.METRICS_LATEST);
-        taskManager = new TaskManager(
-            changeLogReader,
-            UUID.randomUUID(),
-            "taskManagerTest",
-            streamsMetrics,
-            activeTaskCreator,
-            standbyTaskCreator,
-            topologyBuilder,
-            adminClient,
-            stateDirectory,
-            processingMode
-        );
-        taskManager.setMainConsumer(consumer);
+        setUpTaskManager(processingMode);
 
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions,
true);
         task01.setCommittableOffsetsAndMetadata(offsetsT01);


Mime
View raw message