kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3211: Handle WorkerTask stop before start correctly
Date Fri, 05 Feb 2016 02:00:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk db8d6f02c -> dc662776c


KAFKA-3211: Handle WorkerTask stop before start correctly

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #874 from hachikuji/KAFKA-3211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc662776
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc662776
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc662776

Branch: refs/heads/trunk
Commit: dc662776cde8e980a3f978041adaf961edf0fe7d
Parents: db8d6f0
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Feb 4 18:00:45 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Feb 4 18:00:45 2016 -0800

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerTask.java       |  3 +
 .../kafka/connect/runtime/WorkerTaskTest.java   | 92 ++++++++++++++++++++
 2 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc662776/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index b4d427a..ecaeb7b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -102,6 +102,9 @@ abstract class WorkerTask implements Runnable {
             throw new IllegalStateException("The task cannot be started while still running");
 
         try {
+            if (stopping.get())
+                return;
+
             execute();
         } catch (Throwable t) {
             log.error("Unhandled exception in task {}", id, t);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc662776/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
new file mode 100644
index 0000000..f5213a6
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.easymock.EasyMock.partialMockBuilder;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class WorkerTaskTest {
+
+    private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
+
+    @Test
+    public void standardStartup() {
+        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+                .withConstructor(ConnectorTaskId.class)
+                .withArgs(new ConnectorTaskId("foo", 0))
+                .addMockedMethod("initialize")
+                .addMockedMethod("execute")
+                .addMockedMethod("close")
+                .createStrictMock();
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        workerTask.execute();
+        EasyMock.expectLastCall();
+
+        workerTask.close();
+        EasyMock.expectLastCall();
+
+        replay(workerTask);
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.run();
+        workerTask.stop();
+        workerTask.awaitStop(1000L);
+
+        verify(workerTask);
+    }
+
+    @Test
+    public void stopBeforeStarting() {
+        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+                .withConstructor(ConnectorTaskId.class)
+                .withArgs(new ConnectorTaskId("foo", 0))
+                .addMockedMethod("initialize")
+                .addMockedMethod("execute")
+                .addMockedMethod("close")
+                .createStrictMock();
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        workerTask.close();
+        EasyMock.expectLastCall();
+
+        replay(workerTask);
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.stop();
+        workerTask.awaitStop(1000L);
+
+        // now run should not do anything
+        workerTask.run();
+
+        verify(workerTask);
+    }
+
+
+}


Mime
View raw message