kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3290: fix race condition with worker task shutdown and mock validation
Date Thu, 03 Mar 2016 22:58:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c8b6c18f6 -> 7079f57f7


KAFKA-3290: fix race condition with worker task shutdown and mock validation

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #1008 from hachikuji/KAFKA-3290-REVISITED


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7079f57f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7079f57f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7079f57f

Branch: refs/heads/trunk
Commit: 7079f57f70dfb5e731baf0a19bbf16e7364c15bb
Parents: c8b6c18
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Mar 3 14:58:43 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 3 14:58:43 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/connect/runtime/WorkerTask.java |  6 +++---
 .../kafka/connect/runtime/WorkerSourceTaskTest.java       | 10 +++++-----
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7079f57f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ff2bb6f..7979fb0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -111,9 +111,6 @@ abstract class WorkerTask implements Runnable {
         } catch (Throwable t) {
             log.error("Task {} threw an uncaught and unrecoverable exception during shutdown",
id, t);
             throw t;
-        } finally {
-            running.set(false);
-            shutdownLatch.countDown();
         }
     }
 
@@ -148,6 +145,9 @@ abstract class WorkerTask implements Runnable {
 
             if (t instanceof Error)
                 throw t;
+        } finally {
+            running.set(false);
+            shutdownLatch.countDown();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7079f57f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8f57336..404be0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -146,7 +146,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
+        assertTrue(workerTask.awaitStop(1000));
 
         PowerMock.verifyAll();
     }
@@ -179,7 +179,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
+        assertTrue(workerTask.awaitStop(1000));
 
         PowerMock.verifyAll();
     }
@@ -214,7 +214,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         awaitPolls(pollLatch);
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
+        assertTrue(workerTask.awaitStop(1000));
 
         PowerMock.verifyAll();
     }
@@ -249,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         awaitPolls(pollLatch);
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
+        assertTrue(workerTask.awaitStop(1000));
 
         PowerMock.verifyAll();
     }
@@ -342,7 +342,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // cannot be invoked immediately in the thread trying to stop the task.
         startupLatch.await(1000, TimeUnit.MILLISECONDS);
         workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
+        assertTrue(workerTask.awaitStop(1000));
 
         PowerMock.verifyAll();
     }


Mime
View raw message