kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3290: fix transient test failures in WorkerSourceTaskTest
Date Thu, 03 Mar 2016 01:22:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f676cfeb8 -> cfc324333


KAFKA-3290: fix transient test failures in WorkerSourceTaskTest

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #998 from hachikuji/KAFKA-3290


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfc32433
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfc32433
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfc32433

Branch: refs/heads/trunk
Commit: cfc324333fa13e06ec7ac5ef3a09d8a6b6b54485
Parents: f676cfe
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Mar 2 17:22:14 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Mar 2 17:22:14 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/connect/runtime/WorkerTask.java |  3 +++
 .../connect/runtime/WorkerSourceTaskTest.java    | 19 ++++++++++---------
 2 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cfc32433/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index cc69c0f..ff2bb6f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -145,6 +145,9 @@ abstract class WorkerTask implements Runnable {
         } catch (Throwable t) {
             if (!cancelled.get())
                 lifecycleListener.onFailure(id, t);
+
+            if (t instanceof Error)
+                throw t;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfc32433/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8fb8bb5..14c0c6e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -60,9 +60,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
@@ -200,8 +199,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(1);
         expectOffsetFlush(true);
 
-        sourceTask.commit();
-        EasyMock.expectLastCall();
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
@@ -235,11 +232,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
-        expectOffsetFlush(false);
+        expectOffsetFlush(true);
 
         sourceTask.stop();
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
+        expectOffsetFlush(false);
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
@@ -249,7 +246,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerTask.initialize(EMPTY_TASK_PROPS);
         executor.submit(workerTask);
         awaitPolls(pollLatch);
-        assertFalse(workerTask.commitOffsets());
+        assertTrue(workerTask.commitOffsets());
         workerTask.stop();
         assertEquals(true, workerTask.awaitStop(1000));
 
@@ -319,9 +316,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
-        statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
 
+        statusListener.onStartup(taskId);
         EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
@@ -330,8 +327,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                 return null;
             }
         });
+
         sourceTask.stop();
         EasyMock.expectLastCall();
+        expectOffsetFlush(true);
 
         PowerMock.replayAll();
 
@@ -450,6 +449,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
                 flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
         if (succeed) {
+            sourceTask.commit();
+            EasyMock.expectLastCall();
             futureGetExpect.andReturn(null);
         } else {
             futureGetExpect.andThrow(new TimeoutException());


Mime
View raw message