kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: MINOR: Catch Throwable in commitSourceTask()
Date Tue, 21 Jun 2016 03:43:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 8d63690ef -> 37244881e


MINOR: Catch Throwable in commitSourceTask()

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1402 from Ishiihara/source-task-commit-record


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/280f09b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/280f09b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/280f09b8

Branch: refs/heads/0.10.0
Commit: 280f09b8dc72d41f7b7efbce4fc4e3620a44cf4b
Parents: 8d63690
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Tue Jun 14 13:21:30 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jun 20 20:40:06 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java |  6 +-
 .../connect/runtime/WorkerSourceTaskTest.java   | 79 ++++++++++++++++----
 2 files changed, 67 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/280f09b8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index fd551ab..83d1c84 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -243,6 +243,8 @@ class WorkerSourceTask extends WorkerTask {
             task.commitRecord(record);
         } catch (InterruptedException e) {
             log.error("Exception thrown", e);
+        } catch (Throwable t) {
+            log.error("Exception thrown while calling task.commitRecord()", t);
         }
     }
 
@@ -366,8 +368,8 @@ class WorkerSourceTask extends WorkerTask {
             this.task.commit();
         } catch (InterruptedException ex) {
             log.warn("Commit interrupted", ex);
-        } catch (Throwable ex) {
-            log.error("Exception thrown while calling task.commit()", ex);
+        } catch (Throwable t) {
+            log.error("Exception thrown while calling task.commit()", t);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/280f09b8/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0768781..0761245 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
+    private final Random random = new Random();
     private static final String TOPIC = "topic";
     private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key",
"partition".getBytes());
     private static final Map<String, Integer> OFFSET = Collections.singletonMap("key",
12);
@@ -197,7 +200,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
-        awaitPolls(pollLatch);
+        awaitLatch(pollLatch);
 
         workerTask.transitionTo(TargetState.PAUSED);
 
@@ -238,7 +241,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
-        awaitPolls(pollLatch);
+        awaitLatch(pollLatch);
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
@@ -271,7 +274,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
-        awaitPolls(pollLatch);
+        awaitLatch(pollLatch);
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
@@ -306,7 +309,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
-        awaitPolls(pollLatch);
+        awaitLatch(pollLatch);
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
@@ -341,7 +344,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
-        awaitPolls(pollLatch);
+        awaitLatch(pollLatch);
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
@@ -404,6 +407,30 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     @Test
+    public void testSendRecordsTaskCommitRecordFail() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA,
KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA,
KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA,
KEY, RECORD_SCHEMA, RECORD);
+
+        // Source task commit record failure will not cause the task to abort
+        expectSendRecordOnce(false);
+        expectSendRecordTaskCommitRecordFail(false, false);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSlowTaskStart() throws Exception {
         final CountDownLatch startupLatch = new CountDownLatch(1);
 
@@ -435,7 +462,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // Stopping immediately while the other thread has work to do should result in no
polling, no offset commits,
         // exiting the work thread immediately, and the stop() method will be invoked in
the background thread since it
         // cannot be invoked immediately in the thread trying to stop the task.
-        startupLatch.await(1000, TimeUnit.MILLISECONDS);
+        awaitLatch(startupLatch);
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
@@ -479,14 +506,22 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes()
throws InterruptedException {
-        return expectSendRecord(true, false);
+        return expectSendRecordTaskCommitRecordSucceed(true, false);
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean
isRetry) throws InterruptedException {
-        return expectSendRecord(false, isRetry);
+        return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
+    }
+
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean
anyTimes, boolean isRetry) throws InterruptedException {
+        return expectSendRecord(anyTimes, isRetry, true);
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean
anyTimes, boolean isRetry) throws InterruptedException {
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean
anyTimes, boolean isRetry) throws InterruptedException {
+        return expectSendRecord(anyTimes, isRetry, false);
+    }
+
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean
anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
         expectConvertKeyValue(anyTimes);
 
         Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
@@ -523,11 +558,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             expect.andAnswer(expectResponse);
 
         // 3. As a result of a successful producer send callback, we'll notify the source
task of the record commit
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
-        if (anyTimes)
-            EasyMock.expectLastCall().anyTimes();
-        else
-            EasyMock.expectLastCall();
+        expectTaskCommitRecord(anyTimes, succeed);
 
         return sent;
     }
@@ -545,8 +576,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             convertValueExpect.andReturn(SERIALIZED_RECORD);
     }
 
-    private boolean awaitPolls(CountDownLatch latch) throws InterruptedException {
-        return latch.await(1000, TimeUnit.MILLISECONDS);
+    private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException
{
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+        IExpectationSetters<Void> expect = EasyMock.expectLastCall();
+        if (!succeed) {
+            expect = expect.andThrow(new InterruptException("Error committing record in source
task"));
+        }
+        if (anyTimes) {
+            expect.anyTimes();
+        }
+    }
+
+    private boolean awaitLatch(CountDownLatch latch) {
+        try {
+            return latch.await(1000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        return false;
     }
 
     @SuppressWarnings("unchecked")


Mime
View raw message