kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
Date Sat, 16 May 2020 00:54:13 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62fa8fc  KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
62fa8fc is described below

commit 62fa8fc9a95d738780d1f73d2d758d7329828feb
Author: Greg Harris <gharris1727@gmail.com>
AuthorDate: Fri May 15 17:53:32 2020 -0700

    KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
    
    * If two exceptions are thrown the `closePartitions` exception is suppressed
    * Add unit tests that throw exceptions in put and close to verify that
      the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute
    
    Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>,
Konstantine Karantasis <konstantine@confluent.io>
---
 .../java/org/apache/kafka/common/utils/Utils.java  | 12 +++
 .../kafka/connect/runtime/WorkerSinkTask.java      |  9 +--
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 86 ++++++++++++++++++++++
 3 files changed, 102 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 9da8e52..7b47a1a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -886,6 +886,18 @@ public final class Utils {
     }
 
     /**
+     * An {@link AutoCloseable} interface without a throws clause in the signature
+     *
+     * This is used with lambda expressions in try-with-resources clauses
+     * to avoid casting un-checked exceptions to checked exceptions unnecessarily.
+     */
+    @FunctionalInterface
+    public interface UncheckedCloseable extends AutoCloseable {
+        @Override
+        void close();
+    }
+
+    /**
      * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
      */
     public static void closeQuietly(AutoCloseable closeable, String name) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 659dadf..3a8c8d4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Utils.UncheckedCloseable;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
@@ -193,13 +194,11 @@ class WorkerSinkTask extends WorkerTask {
     @Override
     public void execute() {
         initializeAndStart();
-        try {
+        // Make sure any uncommitted data has been committed and the task has
+        // a chance to clean up its state
+        try (UncheckedCloseable suppressible = this::closePartitions) {
             while (!isStopping())
                 iteration();
-        } finally {
-            // Make sure any uncommitted data has been committed and the task has
-            // a chance to clean up its state
-            closePartitions();
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 285cbbe..5dc2f44 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -86,6 +87,7 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -856,6 +858,90 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Stop the task during the next put
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andAnswer(() -> {
+            workerTask.stop();
+            return null;
+        });
+
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable closeException = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(closeException);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail("workerTask.execute should have thrown an exception");
+        } catch (RuntimeException e) {
+            PowerMock.verifyAll();
+            assertSame("Exception from close should propagate as-is", closeException, e);
+        }
+    }
+
+    @Test
+    public void testSuppressCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable putException = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(putException);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable closeException = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(closeException);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail("workerTask.execute should have thrown an exception");
+        } catch (ConnectException e) {
+            PowerMock.verifyAll();
+            assertSame("Exception from put should be the cause", putException, e.getCause());
+            assertTrue("Exception from close should be suppressed", e.getSuppressed().length
> 0);
+            assertSame(closeException, e.getSuppressed()[0]);
+        }
+    }
+
     // Verify that when commitAsync is called but the supplied callback is not called by
the consumer before a
     // rebalance occurs, the async callback does not reset the last committed offset from
the rebalance.
     // See KAFKA-5731 for more information.


Mime
View raw message