Repository: kafka
Updated Branches:
refs/heads/trunk c8b6c18f6 -> 7079f57f7
KAFKA-3290: fix race condition with worker task shutdown and mock validation
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Gwen Shapira
Closes #1008 from hachikuji/KAFKA-3290-REVISITED
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7079f57f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7079f57f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7079f57f
Branch: refs/heads/trunk
Commit: 7079f57f70dfb5e731baf0a19bbf16e7364c15bb
Parents: c8b6c18
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Mar 3 14:58:43 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 3 14:58:43 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/connect/runtime/WorkerTask.java | 6 +++---
.../kafka/connect/runtime/WorkerSourceTaskTest.java | 10 +++++-----
2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7079f57f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ff2bb6f..7979fb0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -111,9 +111,6 @@ abstract class WorkerTask implements Runnable {
} catch (Throwable t) {
log.error("Task {} threw an uncaught and unrecoverable exception during shutdown",
id, t);
throw t;
- } finally {
- running.set(false);
- shutdownLatch.countDown();
}
}
@@ -148,6 +145,9 @@ abstract class WorkerTask implements Runnable {
if (t instanceof Error)
throw t;
+ } finally {
+ running.set(false);
+ shutdownLatch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7079f57f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8f57336..404be0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -146,7 +146,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
executor.submit(workerTask);
awaitPolls(pollLatch);
workerTask.stop();
- assertEquals(true, workerTask.awaitStop(1000));
+ assertTrue(workerTask.awaitStop(1000));
PowerMock.verifyAll();
}
@@ -179,7 +179,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
executor.submit(workerTask);
awaitPolls(pollLatch);
workerTask.stop();
- assertEquals(true, workerTask.awaitStop(1000));
+ assertTrue(workerTask.awaitStop(1000));
PowerMock.verifyAll();
}
@@ -214,7 +214,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
awaitPolls(pollLatch);
assertTrue(workerTask.commitOffsets());
workerTask.stop();
- assertEquals(true, workerTask.awaitStop(1000));
+ assertTrue(workerTask.awaitStop(1000));
PowerMock.verifyAll();
}
@@ -249,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
awaitPolls(pollLatch);
assertTrue(workerTask.commitOffsets());
workerTask.stop();
- assertEquals(true, workerTask.awaitStop(1000));
+ assertTrue(workerTask.awaitStop(1000));
PowerMock.verifyAll();
}
@@ -342,7 +342,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// cannot be invoked immediately in the thread trying to stop the task.
startupLatch.await(1000, TimeUnit.MILLISECONDS);
workerTask.stop();
- assertEquals(true, workerTask.awaitStop(1000));
+ assertTrue(workerTask.awaitStop(1000));
PowerMock.verifyAll();
}
|