kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list (#7491)
Date Tue, 15 Oct 2019 21:49:07 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 05fa34b  KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list
(#7491)
05fa34b is described below

commit 05fa34bf802a241a6e33249f2eac5e138706ae0b
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Tue Oct 15 14:08:31 2019 -0700

    KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list (#7491)
    
    Author: Konstantine Karantasis <konstantine@confluent.io>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../kafka/connect/runtime/WorkerSourceTask.java    |  3 +-
 .../connect/runtime/WorkerSourceTaskTest.java      | 66 +++++++++++++++++++++-
 2 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 2646906..ed49f2a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -262,7 +262,8 @@ class WorkerSourceTask extends WorkerTask {
     private boolean sendRecords() {
         int processed = 0;
         recordBatch(toSend.size());
-        final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(),
sourceTaskMetricsGroup);
+        final SourceRecordWriteCounter counter =
+                toSend.size() > 0 ? new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup)
: null;
         for (final SourceRecord preTransformRecord : toSend) {
             maybeThrowProducerSendException();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 708e0a5..378029c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -73,7 +73,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.log4j.*"})
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
     private static final String TOPIC = "topic";
@@ -334,6 +335,51 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     @Test
+    public void testPollReturnsNoRecords() throws Exception {
+        // Test that the task handles an empty list of records
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger());
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        producer.close(EasyMock.anyInt(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
+
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testCommit() throws Exception {
         // Test that the task commits properly when prompted
         createWorkerTask();
@@ -676,6 +722,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         assertEquals(1800.0, metrics.currentMetricValueAsDouble(group1.metricGroup(), "source-record-active-count"),
0.001d);
     }
 
+    private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) throws
InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(minimum);
+        // Note that we stub these to allow any number of calls because the thread will continue
to
+        // run. The count passed in + latch returned just makes sure we get *at least* that
number of
+        // calls
+        EasyMock.expect(sourceTask.poll())
+                .andStubAnswer(new IAnswer<List<SourceRecord>>() {
+                    @Override
+                    public List<SourceRecord> answer() throws Throwable {
+                        count.incrementAndGet();
+                        latch.countDown();
+                        Thread.sleep(10);
+                        return Collections.emptyList();
+                    }
+                });
+        return latch;
+    }
+
     private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException
{
         final CountDownLatch latch = new CountDownLatch(minimum);
         // Note that we stub these to allow any number of calls because the thread will continue
to


Mime
View raw message