kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3863: System tests covering connector/task failure and restart
Date Thu, 23 Jun 2016 00:06:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8bf18df1b -> 36cab7dbd


KAFKA-3863: System tests covering connector/task failure and restart

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1519 from hachikuji/KAFKA-3863


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36cab7db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36cab7db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36cab7db

Branch: refs/heads/trunk
Commit: 36cab7dbdff6981d0df4b355dadee3fac35508a6
Parents: 8bf18df
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Jun 22 17:06:49 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Jun 22 17:06:49 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   |   7 +-
 .../apache/kafka/connect/runtime/Worker.java    |   1 +
 .../kafka/connect/runtime/WorkerConnector.java  |   3 +-
 .../kafka/connect/tools/MockConnector.java      | 111 +++++++++++++++++++
 .../kafka/connect/tools/MockSinkConnector.java  |  84 ++++++++++++++
 .../kafka/connect/tools/MockSinkTask.java       |  71 ++++++++++++
 .../connect/tools/MockSourceConnector.java      |  84 ++++++++++++++
 .../kafka/connect/tools/MockSourceTask.java     |  66 +++++++++++
 .../resources/ConnectorPluginsResourceTest.java |   6 +
 tests/kafkatest/services/connect.py             |  49 ++++++++
 .../tests/connect/connect_distributed_test.py   |  79 ++++++++++++-
 11 files changed, 554 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index a29d216..1130268 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -32,6 +32,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -87,7 +90,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
     private static List<ConnectorPluginInfo> validConnectorPlugins;
     private static final Object LOCK = new Object();
     private Thread classPathTraverser;
-    private static final List<Class<? extends Connector>> EXCLUDES = Arrays.<Class<?
extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
+    private static final List<Class<? extends Connector>> EXCLUDES = Arrays.asList(
+            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
+            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class);
 
     public AbstractHerder(Worker worker,
                           String workerId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a88d0f9..e39a7e2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -401,6 +401,7 @@ public class Worker {
         WorkerTask task = getTask(id);
         stopTask(task);
         awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
+        log.info("Task {} completed shutdown.", task.id());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 7880095..b96976d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -66,10 +66,9 @@ public class WorkerConnector {
     }
 
     public void initialize(ConnectorConfig connectorConfig) {
-        log.debug("Initializing connector {} with config {}", connName, config);
-
         try {
             this.config = connectorConfig.originalsStrings();
+            log.debug("Initializing connector {} with config {}", connName, config);
 
             connector.initialize(new ConnectorContext() {
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
new file mode 100644
index 0000000..919e896
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This connector provides support for mocking certain connector behaviors. For example,
+ * this can be used to simulate connector or task failures. It works by passing a "mock mode"
+ * through configuration from the system test. New mock behavior can be implemented either
+ * in the connector or in the task by providing a new mode implementation.
+ *
+ * At the moment, this connector only supports a single task and shares configuration between
+ * the connector and its tasks.
+ *
+ * @see MockSinkConnector
+ * @see MockSourceConnector
+ */
+public class MockConnector extends Connector {
+    public static final String MOCK_MODE_KEY = "mock_mode";
+    public static final String DELAY_MS_KEY = "delay_ms";
+
+    public static final String CONNECTOR_FAILURE = "connector-failure";
+    public static final String TASK_FAILURE = "task-failure";
+
+    public static final long DEFAULT_FAILURE_DELAY_MS = 15000;
+
+    private Map<String, String> config;
+    private ScheduledExecutorService executor;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> config) {
+        this.config = config;
+
+        if (CONNECTOR_FAILURE.equals(config.get(MOCK_MODE_KEY))) {
+            // Schedule this connector to raise an exception after some delay
+
+            String delayMsString = config.get(DELAY_MS_KEY);
+            long delayMs = DEFAULT_FAILURE_DELAY_MS;
+            if (delayMsString != null)
+                delayMs = Long.parseLong(delayMsString);
+
+            executor = Executors.newSingleThreadScheduledExecutor();
+            executor.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    context.raiseError(new RuntimeException());
+                }
+            }, delayMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return Collections.singletonList(config);
+    }
+
+    @Override
+    public void stop() {
+        if (executor != null) {
+            executor.shutdownNow();
+
+            try {
+                if (!executor.awaitTermination(20, TimeUnit.SECONDS))
+                    throw new RuntimeException("Failed timely termination of scheduler");
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Task was interrupted during shutdown");
+            }
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
new file mode 100644
index 0000000..67fca66
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mock sink implementation which delegates to {@link MockConnector}.
+ */
+public class MockSinkConnector extends SinkConnector {
+
+    private MockConnector delegate = new MockConnector();
+
+    @Override
+    public void initialize(ConnectorContext ctx) {
+        delegate.initialize(ctx);
+    }
+
+    @Override
+    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs)
{
+        delegate.initialize(ctx, taskConfigs);
+    }
+
+    @Override
+    public void reconfigure(Map<String, String> props) {
+        delegate.reconfigure(props);
+    }
+
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        return delegate.validate(connectorConfigs);
+    }
+
+    @Override
+    public String version() {
+        return delegate.version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        delegate.start(props);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MockSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return delegate.taskConfigs(maxTasks);
+    }
+
+    @Override
+    public void stop() {
+        delegate.stop();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return delegate.config();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
new file mode 100644
index 0000000..2e4b35e
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class MockSinkTask extends SinkTask {
+
+    private String mockMode;
+    private long startTimeMs;
+    private long failureDelayMs;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> config) {
+        this.mockMode = config.get(MockConnector.MOCK_MODE_KEY);
+
+        if (MockConnector.TASK_FAILURE.equals(mockMode)) {
+            this.startTimeMs = System.currentTimeMillis();
+
+            String delayMsString = config.get(MockConnector.DELAY_MS_KEY);
+            this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
+            if (delayMsString != null)
+                failureDelayMs = Long.parseLong(delayMsString);
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        if (MockConnector.TASK_FAILURE.equals(mockMode)) {
+            long now = System.currentTimeMillis();
+            if (now > startTimeMs + failureDelayMs)
+                throw new RuntimeException();
+        }
+    }
+
+    @Override
+    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
new file mode 100644
index 0000000..d69e355
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mock source implementation which delegates to {@link MockConnector}.
+ */
+public class MockSourceConnector extends SourceConnector {
+
+    private MockConnector delegate = new MockConnector();
+
+    @Override
+    public void initialize(ConnectorContext ctx) {
+        delegate.initialize(ctx);
+    }
+
+    @Override
+    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs)
{
+        delegate.initialize(ctx, taskConfigs);
+    }
+
+    @Override
+    public void reconfigure(Map<String, String> props) {
+        delegate.reconfigure(props);
+    }
+
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        return delegate.validate(connectorConfigs);
+    }
+
+    @Override
+    public String version() {
+        return delegate.version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        delegate.start(props);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MockSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        return delegate.taskConfigs(maxTasks);
+    }
+
+    @Override
+    public void stop() {
+        delegate.stop();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return delegate.config();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
new file mode 100644
index 0000000..eb896af
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MockSourceTask extends SourceTask {
+
+    private String mockMode;
+    private long startTimeMs;
+    private long failureDelayMs;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> config) {
+        this.mockMode = config.get(MockConnector.MOCK_MODE_KEY);
+
+        if (MockConnector.TASK_FAILURE.equals(mockMode)) {
+            this.startTimeMs = System.currentTimeMillis();
+
+            String delayMsString = config.get(MockConnector.DELAY_MS_KEY);
+            this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS;
+            if (delayMsString != null)
+                failureDelayMs = Long.parseLong(delayMsString);
+        }
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        if (MockConnector.TASK_FAILURE.equals(mockMode)) {
+            long now = System.currentTimeMillis();
+            if (now > startTimeMs + failureDelayMs)
+                throw new RuntimeException();
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ddf30c7..c7f532b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -39,6 +39,9 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.easymock.EasyMock;
@@ -165,6 +168,9 @@ public class ConnectorPluginsResourceTest {
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
         assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 5371a72..7f36854 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -135,6 +135,12 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def get_connector_status(self, name, node=None):
         return self._rest('/connectors/' + name + '/status', node=node)
 
+    def restart_connector(self, name, node=None):
+        return self._rest('/connectors/' + name + '/restart', method="POST")        
+
+    def restart_task(self, connector_name, task_id, node=None):
+        return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart',
method="POST")        
+
     def pause_connector(self, name, node=None):
         return self._rest('/connectors/' + name + '/pause', method="PUT")
 
@@ -331,3 +337,46 @@ class VerifiableSink(VerifiableConnector):
             'tasks.max': self.tasks,
             'topics': ",".join(self.topics)
         })
+
+class MockSink(object):
+
+    def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+        self.cc = cc
+        self.logger = self.cc.logger
+        self.name = name
+        self.mode = mode
+        self.delay_sec = delay_sec
+        self.topics = topics
+
+    def start(self):
+        self.logger.info("Creating connector MockSinkConnector %s", self.name)
+        self.cc.create_connector({
+            'name': self.name,
+            'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector',
+            'tasks.max': 1,
+            'topics': ",".join(self.topics),
+            'mock_mode': self.mode,
+            'delay_ms': self.delay_sec * 1000
+        })
+
+class MockSource(object):
+
+    def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-source"):
+        self.cc = cc
+        self.logger = self.cc.logger
+        self.name = name
+        self.mode = mode
+        self.delay_sec = delay_sec
+        self.topics = topics
+
+    def start(self):
+        self.logger.info("Creating connector MockSourceConnector %s", self.name)
+        self.cc.create_connector({
+            'name': self.name,
+            'connector.class': 'org.apache.kafka.connect.tools.MockSourceConnector',
+            'tasks.max': 1,
+            'topics': ",".join(self.topics),
+            'mock_mode': self.mode,
+            'delay_ms': self.delay_sec * 1000
+        })
+        

http://git-wip-us.apache.org/repos/asf/kafka/blob/36cab7db/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index a4d68f3..d4c4225 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -17,7 +17,7 @@ from ducktape.tests.test import Test
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink,
ConnectRestError
+from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink,
ConnectRestError, MockSink, MockSource
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
@@ -88,9 +88,23 @@ class ConnectDistributedTest(Test):
         except ConnectRestError:
             return None
 
-    def _has_state(self, status, state):
+    def _connector_has_state(self, status, state):
         return status is not None and status['connector']['state'] == state
 
+    def _task_has_state(self, task_id, status, state):
+        if not status:
+            return False
+
+        tasks = status['tasks']
+        if not tasks:
+            return False
+
+        for task in tasks:
+            if task['id'] == task_id:
+                return task['state'] == state
+
+        return False
+
     def _all_tasks_have_state(self, status, task_count, state):
         if status is None:
             return False
@@ -103,11 +117,68 @@ class ConnectDistributedTest(Test):
 
     def is_running(self, connector, node=None):
         status = self._connector_status(connector.name, node)
-        return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status,
connector.tasks, 'RUNNING')
+        return self._connector_has_state(status, 'RUNNING') and self._all_tasks_have_state(status,
connector.tasks, 'RUNNING')
 
     def is_paused(self, connector, node=None):
         status = self._connector_status(connector.name, node)
-        return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks,
'PAUSED')
+        return self._connector_has_state(status, 'PAUSED') and self._all_tasks_have_state(status,
connector.tasks, 'PAUSED')
+
+    def connector_is_running(self, connector, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._connector_has_state(status, 'RUNNING')
+
+    def connector_is_failed(self, connector, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._connector_has_state(status, 'FAILED')
+
+    def task_is_failed(self, connector, task_id, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._task_has_state(task_id, status, 'FAILED')
+
+    def task_is_running(self, connector, task_id, node=None):
+        status = self._connector_status(connector.name, node)
+        return self._task_has_state(task_id, status, 'RUNNING')
+
+    def test_restart_failed_connector(self):
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5)
+        self.sink.start()
+
+        wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15,
+                   err_msg="Failed to see connector transition to the FAILED state")
+
+        self.cc.restart_connector(self.sink.name)
+        
+        wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+
+    
+    @matrix(connector_type=["source", "sink"])
+    def test_restart_failed_task(self, connector_type):
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        connector = None
+        if connector_type == "sink":
+            connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
+        else:
+            connector = MockSource(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
+            
+        connector.start()
+
+        task_id = 0
+        wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15,
+                   err_msg="Failed to see task transition to the FAILED state")
+
+        self.cc.restart_task(connector.name, task_id)
+        
+        wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10,
+                   err_msg="Failed to see task transition to the RUNNING state")
+
 
     def test_pause_and_resume_source(self):
         """


Mime
View raw message