kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [3/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
Date Wed, 24 Feb 2016 06:47:50 GMT
KAFKA-3093: Add Connect status tracking API

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #920 from hachikuji/KAFKA-3093


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7d019ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7d019ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7d019ed

Branch: refs/heads/trunk
Commit: f7d019ed408fa988129be9af3689bfa4878bc627
Parents: aeb9c2a
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Feb 23 22:47:31 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Feb 23 22:47:31 2016 -0800

----------------------------------------------------------------------
 config/connect-distributed.properties           |   3 +-
 .../connect/connector/ConnectorContext.java     |   7 +
 .../kafka/connect/file/FileStreamSinkTask.java  |   2 +-
 .../kafka/connect/cli/ConnectDistributed.java   |  28 +-
 .../kafka/connect/cli/ConnectStandalone.java    |  13 +-
 .../kafka/connect/runtime/AbstractHerder.java   | 156 +++++++
 .../kafka/connect/runtime/AbstractStatus.java   | 100 ++++
 .../kafka/connect/runtime/ConnectorStatus.java  |  58 +++
 .../apache/kafka/connect/runtime/Herder.java    |  16 +-
 .../connect/runtime/HerderConnectorContext.java |  11 +-
 .../kafka/connect/runtime/TaskStatus.java       |  53 +++
 .../apache/kafka/connect/runtime/Worker.java    | 210 +++++----
 .../kafka/connect/runtime/WorkerSinkTask.java   |   4 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  18 +-
 .../kafka/connect/runtime/WorkerTask.java       |  47 +-
 .../runtime/distributed/DistributedHerder.java  | 107 +++--
 .../runtime/distributed/WorkerCoordinator.java  |   2 +-
 .../runtime/distributed/WorkerGroupMember.java  |   9 +-
 .../distributed/WorkerRebalanceListener.java    |   2 +-
 .../kafka/connect/runtime/rest/RestServer.java  |   9 +-
 .../rest/entities/ConnectorStateInfo.java       | 108 +++++
 .../rest/resources/ConnectorsResource.java      |  21 +-
 .../runtime/standalone/StandaloneHerder.java    |  51 +-
 .../connect/storage/KafkaConfigStorage.java     |  15 +-
 .../storage/KafkaOffsetBackingStore.java        |  13 +-
 .../storage/KafkaStatusBackingStore.java        | 461 +++++++++++++++++++
 .../storage/MemoryStatusBackingStore.java       | 105 +++++
 .../connect/storage/StatusBackingStore.java     | 100 ++++
 .../kafka/connect/util/KafkaBasedLog.java       |  24 +-
 .../org/apache/kafka/connect/util/Table.java    |  65 +++
 .../connect/runtime/AbstractHerderTest.java     | 116 +++++
 .../connect/runtime/WorkerSinkTaskTest.java     |   4 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   6 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  54 ++-
 .../kafka/connect/runtime/WorkerTaskTest.java   |  88 +++-
 .../kafka/connect/runtime/WorkerTest.java       | 161 ++++---
 .../distributed/DistributedHerderTest.java      | 121 ++++-
 .../distributed/WorkerCoordinatorTest.java      |   2 +-
 .../standalone/StandaloneHerderTest.java        |  42 +-
 .../storage/KafkaStatusBackingStoreTest.java    | 373 +++++++++++++++
 .../storage/MemoryStatusBackingStoreTest.java   |  66 +++
 .../apache/kafka/connect/util/TableTest.java    |  48 ++
 tests/kafkatest/services/connect.py             |   4 +-
 .../kafkatest/tests/connect_distributed_test.py |   1 +
 tests/kafkatest/tests/connect_rest_test.py      |   4 +-
 .../templates/connect-distributed.properties    |   1 +
 46 files changed, 2611 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index 9ec63db..46bd3bc 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -39,4 +39,5 @@ internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000
-config.storage.topic=connect-configs
\ No newline at end of file
+config.storage.topic=connect-configs
+status.storage.topic=connect-status
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
index 2a06484..c8a06e8 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
@@ -30,4 +30,11 @@ public interface ConnectorContext {
      * added/removed) and the running Tasks will need to be modified.
      */
     void requestTaskReconfiguration();
+
+    /**
+     * Raise an unrecoverable exception to the Connect framework. This will cause the status of the
+     * connector to transition to FAILED.
+     * @param e Exception to be raised.
+     */
+    void raiseError(Exception e);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 83ba6d4..09d4ed8 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -84,7 +84,7 @@ public class FileStreamSinkTask extends SinkTask {
 
     @Override
     public void stop() {
-        if (outputStream != System.out)
+        if (outputStream != null && outputStream != System.out)
             outputStream.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 5ad032e..bc5b75a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
@@ -25,9 +27,12 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 
@@ -54,12 +59,29 @@ public class ConnectDistributed {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
+        Time time = new SystemTime();
         DistributedConfig config = new DistributedConfig(workerProps);
-        Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+
         RestServer rest = new RestServer(config);
-        DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        offsetBackingStore.configure(config.originals());
+
+        Worker worker = new Worker(workerId, time, config, offsetBackingStore);
+
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+        statusBackingStore.configure(config.originals());
+
+        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
         final Connect connect = new Connect(worker, herder, rest);
-        connect.start();
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+        }
 
         // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
         connect.awaitStop();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index f89a72a..6c4335e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -33,6 +35,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -63,9 +66,15 @@ public class ConnectStandalone {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
+        Time time = new SystemTime();
         StandaloneConfig config = new StandaloneConfig(workerProps);
-        Worker worker = new Worker(config, new FileOffsetBackingStore());
+
         RestServer rest = new RestServer(config);
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
+
         Herder herder = new StandaloneHerder(worker);
         final Connect connect = new Connect(worker, herder, rest);
         connect.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
new file mode 100644
index 0000000..ca85d87
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
+ * must invoke the lifecycle hooks appropriately.
+ *
+ * This class takes the following approach for sending status updates to the backing store:
+ *
+ * 1) When the connector or task is starting, we overwrite the previous state blindly. This ensures that
+ *    every rebalance will reset the state of tasks to the proper state. The intuition is that there should
+ *    be less chance of write conflicts when the worker has just received its assignment and is starting tasks.
+ *    In particular, this prevents us from depending on the generation absolutely. If the group disappears
+ *    and the generation is reset, then we'll overwrite the status information with the older (and larger)
+ *    generation with the updated one. The danger of this approach is that slow starting tasks may cause the
+ *    status to be overwritten after a rebalance has completed.
+ *
+ * 2) If the connector or task fails or is shutdown, we use {@link StatusBackingStore#putSafe(ConnectorStatus)},
+ *    which provides a little more protection if the worker is no longer in the group (in which case the
+ *    task may have already been started on another worker). Obviously this is still racy. If the task has just
+ *    started on another worker, we may not have the updated status cached yet. In this case, we'll overwrite
+ *    the value which will cause the state to be inconsistent (most likely until the next rebalance). Until
+ *    we have proper producer groups with fenced groups, there is not much else we can do.
+ */
+public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
+
+    protected final StatusBackingStore statusBackingStore;
+    private final String workerId;
+
+    public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+        this.statusBackingStore = statusBackingStore;
+        this.workerId = workerId;
+    }
+
+    protected abstract int generation();
+
+    protected void startServices() {
+        this.statusBackingStore.start();
+    }
+
+    protected void stopServices() {
+        this.statusBackingStore.stop();
+    }
+
+    @Override
+    public void onStartup(String connector) {
+        statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
+                workerId, generation()));
+    }
+
+    @Override
+    public void onShutdown(String connector) {
+        statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
+                workerId, generation()));
+    }
+
+    @Override
+    public void onFailure(String connector, Throwable cause) {
+        statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
+                trace(cause), workerId, generation()));
+    }
+
+    @Override
+    public void onStartup(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
+    }
+
+    @Override
+    public void onFailure(ConnectorTaskId id, Throwable cause) {
+        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
+    }
+
+    @Override
+    public void onShutdown(ConnectorTaskId id) {
+        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
+    }
+
+    @Override
+    public void onDeletion(String connector) {
+        for (TaskStatus status : statusBackingStore.getAll(connector))
+            statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+        statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
+    }
+
+    @Override
+    public ConnectorStateInfo connectorStatus(String connName) {
+        ConnectorStatus connector = statusBackingStore.get(connName);
+        if (connector == null)
+            throw new NotFoundException("No status found for connector " + connName);
+
+        Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
+
+        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
+                connector.state().toString(), connector.workerId(), connector.trace());
+        List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
+
+        for (TaskStatus status : tasks) {
+            taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
+                    status.state().toString(), status.workerId(), status.trace()));
+        }
+
+        Collections.sort(taskStates);
+
+        return new ConnectorStateInfo(connName, connectorState, taskStates);
+    }
+
+    @Override
+    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
+        TaskStatus status = statusBackingStore.get(id);
+
+        if (status == null)
+            throw new NotFoundException("No status found for task " + id);
+
+        return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
+                status.workerId(), status.trace());
+    }
+
+    private String trace(Throwable t) {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        t.printStackTrace(new PrintStream(output));
+        try {
+            return output.toString("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
new file mode 100644
index 0000000..4f31be1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public abstract class AbstractStatus<T> {
+
+    public enum State {
+        UNASSIGNED,
+        RUNNING,
+        FAILED,
+        DESTROYED,
+    }
+
+    private final T id;
+    private final State state;
+    private final String trace;
+    private final String workerId;
+    private final int generation;
+
+    public AbstractStatus(T id,
+                          State state,
+                          String workerId,
+                          int generation,
+                          String trace) {
+        this.id = id;
+        this.state = state;
+        this.workerId = workerId;
+        this.generation = generation;
+        this.trace = trace;
+    }
+
+    public T id() {
+        return id;
+    }
+
+    public State state() {
+        return state;
+    }
+
+    public String trace() {
+        return trace;
+    }
+
+    public String workerId() {
+        return workerId;
+    }
+
+    public int generation() {
+        return generation;
+    }
+
+    @Override
+    public String toString() {
+        return "Status{" +
+                "id=" + id +
+                ", state=" + state +
+                ", workerId='" + workerId + '\'' +
+                ", generation=" + generation +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        AbstractStatus<?> that = (AbstractStatus<?>) o;
+
+        if (generation != that.generation) return false;
+        if (id != null ? !id.equals(that.id) : that.id != null) return false;
+        if (state != that.state) return false;
+        if (trace != null ? !trace.equals(that.trace) : that.trace != null) return false;
+        return workerId != null ? workerId.equals(that.workerId) : that.workerId == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (state != null ? state.hashCode() : 0);
+        result = 31 * result + (trace != null ? trace.hashCode() : 0);
+        result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+        result = 31 * result + generation;
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
new file mode 100644
index 0000000..d9a2eba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public class ConnectorStatus extends AbstractStatus<String> {
+
+    public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) {
+        super(connector, state, workerUrl, generation, msg);
+    }
+
+    public ConnectorStatus(String connector, State state, String workerUrl, int generation) {
+        super(connector, state, workerUrl, generation, null);
+    }
+
+    public interface Listener {
+
+        /**
+         * Invoked after connector has successfully been shutdown.
+         * @param connector The connector name
+         */
+        void onShutdown(String connector);
+
+        /**
+         * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}.
+         * Note that no shutdown event will follow after the task has been failed.
+         * @param connector The connector name
+         * @param cause Error raised from the connector.
+         */
+        void onFailure(String connector, Throwable cause);
+
+        /**
+         * Invoked after successful startup of the connector.
+         * @param connector The connector name
+         */
+        void onStartup(String connector);
+
+        /**
+         * Invoked when the connector is deleted through the REST API.
+         * @param connector The connector name
+         */
+        void onDeletion(String connector);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index fc0689c..95c7700 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -18,8 +18,10 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.Collection;
 import java.util.List;
@@ -61,7 +63,7 @@ public interface Herder {
      * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request
      *         (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
      *         also not the leader
-     * @throws ConnectException if this node is the leader, but still cannot resolve the
+     * @throws org.apache.kafka.connect.errors.ConnectException if this node is the leader, but still cannot resolve the
      *         request (e.g., it is not in sync with other worker's config state)
      */
     void connectors(Callback<Collection<String>> callback);
@@ -113,6 +115,18 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
 
+    /**
+     * Lookup the current status of a connector.
+     * @param connName name of the connector
+     */
+    ConnectorStateInfo connectorStatus(String connName);
+
+    /**
+     * Lookup the status of the a task.
+     * @param id id of the task
+     */
+    ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
+
 
     class Created<T> {
         private final boolean created;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
index e3294ef..bd933f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -24,10 +24,10 @@ import org.apache.kafka.connect.connector.ConnectorContext;
  */
 public class HerderConnectorContext implements ConnectorContext {
 
-    private Herder herder;
-    private String connectorName;
+    private final AbstractHerder herder;
+    private final String connectorName;
 
-    public HerderConnectorContext(Herder herder, String connectorName) {
+    public HerderConnectorContext(AbstractHerder herder, String connectorName) {
         this.herder = herder;
         this.connectorName = connectorName;
     }
@@ -38,4 +38,9 @@ public class HerderConnectorContext implements ConnectorContext {
         // Distributed herder will forward the request to the leader if needed
         herder.requestTaskReconfiguration(connectorName);
     }
+
+    @Override
+    public void raiseError(Exception e) {
+        herder.onFailure(connectorName, e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
new file mode 100644
index 0000000..3542eb8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
+
+    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) {
+        super(id, state, workerUrl, generation, trace);
+    }
+
+    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation) {
+        super(id, state, workerUrl, generation, null);
+    }
+
+    public interface Listener {
+
+        /**
+         * Invoked after successful startup of the task.
+         * @param id The id of the task
+         */
+        void onStartup(ConnectorTaskId id);
+
+        /**
+         * Invoked if the task raises an error. No shutdown event will follow.
+         * @param id The id of the task
+         * @param cause The error raised by the task.
+         */
+        void onFailure(ConnectorTaskId id, Throwable cause);
+
+        /**
+         * Invoked after successful shutdown of the task.
+         * @param id The id of the task
+         */
+        void onShutdown(ConnectorTaskId id);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0a4bb7f..8e74fec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -66,25 +65,25 @@ public class Worker {
 
     private final ExecutorService executor;
     private final Time time;
+    private final String workerId;
     private final WorkerConfig config;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final Converter internalKeyConverter;
+    private final Converter internalValueConverter;
+    private final OffsetBackingStore offsetBackingStore;
 
-    private Converter keyConverter;
-    private Converter valueConverter;
-    private Converter internalKeyConverter;
-    private Converter internalValueConverter;
-    private OffsetBackingStore offsetBackingStore;
-    private HashMap<String, Connector> connectors = new HashMap<>();
+    private HashMap<String, WorkerConnector> connectors = new HashMap<>();
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
-    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
-        this(new SystemTime(), config, offsetBackingStore);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+    public Worker(String workerId,
+                  Time time,
+                  WorkerConfig config,
+                  OffsetBackingStore offsetBackingStore) {
         this.executor = Executors.newCachedThreadPool();
+        this.workerId = workerId;
         this.time = time;
         this.config = config;
         this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
@@ -132,34 +131,18 @@ public class Worker {
         long started = time.milliseconds();
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
-        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
-            Connector conn = entry.getValue();
+        for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
+            WorkerConnector conn = entry.getValue();
             log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
                     "Worker is stopped.", conn);
-            try {
-                conn.stop();
-            } catch (ConnectException e) {
-                log.error("Error while shutting down connector " + conn, e);
-            }
+            conn.stop();
         }
 
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.warn("Shutting down task {} uncleanly; herder should have shut down "
-                    + "tasks before the Worker is stopped.", task);
-            try {
-                task.stop();
-            } catch (ConnectException e) {
-                log.error("Error while shutting down task " + task, e);
-            }
-        }
-
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.debug("Waiting for task {} to finish shutting down", task);
-            if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
-                log.error("Graceful shutdown of task {} failed.", task);
-        }
+        Collection<ConnectorTaskId> taskIds = tasks.keySet();
+        log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
+                + "tasks before the Worker is stopped.", taskIds);
+        stopTasks(taskIds);
+        awaitStopTasks(taskIds);
 
         long timeoutMs = limit - time.milliseconds();
         sourceTaskOffsetCommitter.close(timeoutMs);
@@ -169,16 +152,12 @@ public class Worker {
         log.info("Worker stopped");
     }
 
-    public WorkerConfig config() {
-        return config;
-    }
-
     /**
      * Add a new connector.
      * @param connConfig connector configuration
      * @param ctx context for the connector
      */
-    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
@@ -188,22 +167,25 @@ public class Worker {
             throw new ConnectException("Connector with name " + connName + " already exists");
 
         final Connector connector = instantiateConnector(connClass);
+        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+
         log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
-        connector.initialize(ctx);
+        workerConnector.initialize();
         try {
-            connector.start(connConfig.originalsStrings());
+            workerConnector.start(connConfig.originalsStrings());
         } catch (ConnectException e) {
             throw new ConnectException("Connector threw an exception while starting", e);
         }
 
-        connectors.put(connName, connector);
+        connectors.put(connName, workerConnector);
 
         log.info("Finished creating connector {}", connName);
     }
 
     /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
     public boolean isSinkConnector(String connName) {
-        return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
+        WorkerConnector workerConnector = connectors.get(connName);
+        return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
     }
 
     private Class<? extends Connector> getConnectorClass(String connectorAlias) {
@@ -267,10 +249,11 @@ public class Worker {
     public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
-        Connector connector = connectors.get(connName);
-        if (connector == null)
+        WorkerConnector workerConnector = connectors.get(connName);
+        if (workerConnector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
+        Connector connector = workerConnector.delegate;
         List<Map<String, String>> result = new ArrayList<>();
         String taskClassName = connector.taskClass().getName();
         for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
@@ -286,16 +269,11 @@ public class Worker {
     public void stopConnector(String connName) {
         log.info("Stopping connector {}", connName);
 
-        Connector connector = connectors.get(connName);
+        WorkerConnector connector = connectors.get(connName);
         if (connector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
-        try {
-            connector.stop();
-        } catch (ConnectException e) {
-            log.error("Error shutting down connector {}: ", connector, e);
-        }
-
+        connector.stop();
         connectors.remove(connName);
 
         log.info("Stopped connector {}", connName);
@@ -313,7 +291,7 @@ public class Worker {
      * @param id Globally unique ID for this task.
      * @param taskConfig the parsed task configuration
      */
-    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
         log.info("Creating task {}", id);
 
         if (tasks.containsKey(id)) {
@@ -327,33 +305,35 @@ public class Worker {
         final Task task = instantiateTask(taskClass);
         log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
+        final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+
+        // Start the task before adding modifying any state, any exceptions are caught higher up the
+        // call chain and there's no cleanup to do here
+        workerTask.initialize(taskConfig.originalsStrings());
+        executor.submit(workerTask);
+
+        if (task instanceof SourceTask) {
+            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        }
+        tasks.put(id, workerTask);
+    }
+
+    private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener lifecycleListener) {
         // Decide which type of worker task we need based on the type of task.
-        final WorkerTask workerTask;
         if (task instanceof SourceTask) {
-            SourceTask sourceTask = (SourceTask) task;
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
-            workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
+            return new WorkerSourceTask(id, (SourceTask) task, lifecycleListener, keyConverter, valueConverter, producer,
                     offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
-            workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+            return new WorkerSinkTask(id, (SinkTask) task, lifecycleListener, config, keyConverter, valueConverter, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
         }
-
-        // Start the task before adding modifying any state, any exceptions are caught higher up the
-        // call chain and there's no cleanup to do here
-        workerTask.initialize(taskConfig.originalsStrings());
-        executor.submit(workerTask);
-
-        if (task instanceof SourceTask) {
-            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
-            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
-        }
-        tasks.put(id, workerTask);
     }
 
     private static Task instantiateTask(Class<? extends Task> taskClass) {
@@ -364,16 +344,39 @@ public class Worker {
         }
     }
 
-    public void stopTask(ConnectorTaskId id) {
-        log.info("Stopping task {}", id);
+    public void stopTasks(Collection<ConnectorTaskId> ids) {
+        for (ConnectorTaskId id : ids)
+            stopTask(getTask(id));
+    }
 
-        WorkerTask task = getTask(id);
+    public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
+        long now = time.milliseconds();
+        long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+        for (ConnectorTaskId id : ids) {
+            long remaining = Math.max(0, deadline - time.milliseconds());
+            awaitStopTask(getTask(id), remaining);
+        }
+    }
+
+    private void awaitStopTask(WorkerTask task, long timeout) {
+        if (!task.awaitStop(timeout)) {
+            log.error("Graceful stop of task {} failed.", task.id());
+            task.cancel();
+        }
+        tasks.remove(task.id());
+    }
+
+    private void stopTask(WorkerTask task) {
+        log.info("Stopping task {}", task.id());
         if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(id);
+            sourceTaskOffsetCommitter.remove(task.id());
         task.stop();
-        if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
-            log.error("Graceful stop of task {} failed.", task);
-        tasks.remove(id);
+    }
+
+    public void stopAndAwaitTask(ConnectorTaskId id) {
+        WorkerTask task = getTask(id);
+        stopTask(task);
+        awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
     }
 
     /**
@@ -400,4 +403,55 @@ public class Worker {
         return internalValueConverter;
     }
 
+    public String workerId() {
+        return workerId;
+    }
+
+    private static class WorkerConnector  {
+        private final String connName;
+        private final ConnectorStatus.Listener lifecycleListener;
+        private final ConnectorContext ctx;
+        private final Connector delegate;
+
+        public WorkerConnector(String connName,
+                               Connector delegate,
+                               ConnectorContext ctx,
+                               ConnectorStatus.Listener lifecycleListener) {
+            this.connName = connName;
+            this.ctx = ctx;
+            this.delegate = delegate;
+            this.lifecycleListener = lifecycleListener;
+        }
+
+        public void initialize() {
+            delegate.initialize(ctx);
+        }
+
+        public void start(Map<String, String> props) {
+            try {
+                delegate.start(props);
+                lifecycleListener.onStartup(connName);
+            } catch (Throwable t) {
+                log.error("Error while starting connector {}", connName, t);
+                lifecycleListener.onFailure(connName, t);
+            }
+        }
+
+        public void stop() {
+            try {
+                delegate.stop();
+                lifecycleListener.onShutdown(connName);
+            } catch (Throwable t) {
+                log.error("Error while shutting down connector {}", connName, t);
+                lifecycleListener.onFailure(connName, t);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8c5bd9f..eb64355 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -76,11 +76,12 @@ class WorkerSinkTask extends WorkerTask {
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
+                          TaskStatus.Listener lifecycleListener,
                           WorkerConfig workerConfig,
                           Converter keyConverter,
                           Converter valueConverter,
                           Time time) {
-        super(id);
+        super(id, lifecycleListener);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -184,6 +185,7 @@ class WorkerSinkTask extends WorkerTask {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
+        log.debug("Initializing task {} with config {}", id, taskConfig);
         String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new ConnectException("Sink tasks require a list of topics.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 562e03e..8542f4c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -17,13 +17,13 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -76,6 +76,7 @@ class WorkerSourceTask extends WorkerTask {
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
+                            TaskStatus.Listener lifecycleListener,
                             Converter keyConverter,
                             Converter valueConverter,
                             KafkaProducer<byte[], byte[]> producer,
@@ -83,7 +84,7 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
                             Time time) {
-        super(id);
+        super(id, lifecycleListener);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -147,16 +148,13 @@ class WorkerSourceTask extends WorkerTask {
             }
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
-        } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception", id);
-            log.error("Task is being killed and will not recover until manually restarted:", t);
-            // It should still be safe to let this fall through and commit offsets since this exception would have
+        } finally {
+            // It should still be safe to commit offsets since any exception would have
             // simply resulted in not getting more records but all the existing records should be ok to flush
             // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
             // to fail.
+            commitOffsets();
         }
-
-        commitOffsets();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ecaeb7b..cc69c0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -37,13 +37,21 @@ abstract class WorkerTask implements Runnable {
     protected final ConnectorTaskId id;
     private final AtomicBoolean stopping;
     private final AtomicBoolean running;
+    private final AtomicBoolean cancelled;
     private final CountDownLatch shutdownLatch;
+    private final TaskStatus.Listener lifecycleListener;
 
-    public WorkerTask(ConnectorTaskId id) {
+    public WorkerTask(ConnectorTaskId id, TaskStatus.Listener lifecycleListener) {
         this.id = id;
         this.stopping = new AtomicBoolean(false);
         this.running = new AtomicBoolean(false);
+        this.cancelled = new AtomicBoolean(false);
         this.shutdownLatch = new CountDownLatch(1);
+        this.lifecycleListener = lifecycleListener;
+    }
+
+    public ConnectorTaskId id() {
+        return id;
     }
 
     /**
@@ -61,9 +69,17 @@ abstract class WorkerTask implements Runnable {
     }
 
     /**
+     * Cancel this task. This won't actually stop it, but it will prevent the state from being
+     * updated when it eventually does shutdown.
+     */
+    public void cancel() {
+        this.cancelled.set(true);
+    }
+
+    /**
      * Wait for this task to finish stopping.
      *
-     * @param timeoutMs
+     * @param timeoutMs time in milliseconds to await stop
      * @return true if successful, false if the timeout was reached
      */
     public boolean awaitStop(long timeoutMs) {
@@ -85,19 +101,23 @@ abstract class WorkerTask implements Runnable {
         return stopping.get();
     }
 
+    protected boolean isStopped() {
+        return !running.get();
+    }
+
     private void doClose() {
         try {
             close();
         } catch (Throwable t) {
-            log.error("Unhandled exception in task shutdown {}", id, t);
+            log.error("Task {} threw an uncaught and unrecoverable exception during shutdown", id, t);
+            throw t;
         } finally {
             running.set(false);
             shutdownLatch.countDown();
         }
     }
 
-    @Override
-    public void run() {
+    private void doRun() {
         if (!this.running.compareAndSet(false, true))
             throw new IllegalStateException("The task cannot be started while still running");
 
@@ -105,12 +125,27 @@ abstract class WorkerTask implements Runnable {
             if (stopping.get())
                 return;
 
+            lifecycleListener.onStartup(id);
             execute();
         } catch (Throwable t) {
-            log.error("Unhandled exception in task {}", id, t);
+            log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
+            log.error("Task is being killed and will not recover until manually restarted");
+            throw t;
         } finally {
             doClose();
         }
     }
 
+    @Override
+    public void run() {
+        try {
+            doRun();
+            if (!cancelled.get())
+                lifecycleListener.onShutdown(id);
+        } catch (Throwable t) {
+            if (!cancelled.get())
+                lifecycleListener.onFailure(id, t);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8b0525b..83ed714 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,17 +17,16 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
@@ -35,6 +34,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -79,7 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *     (and therefore, also for creating, destroy, and scaling up/down connectors).
  * </p>
  */
-public class DistributedHerder implements Herder, Runnable {
+public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
@@ -108,15 +108,29 @@ public class DistributedHerder implements Herder, Runnable {
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
     private boolean needsReconfigRebalance;
+    private volatile int generation;
 
     private final ExecutorService forwardRequestExecutor;
 
-    public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
-        this(config, worker, null, null, restUrl, new SystemTime());
+    public DistributedHerder(DistributedConfig config,
+                             Time time,
+                             Worker worker,
+                             StatusBackingStore statusBackingStore,
+                             String restUrl) {
+        this(config, worker.workerId(), worker, statusBackingStore, null, null, restUrl, time);
     }
 
-    // public for testing
-    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) {
+    // visible for testing
+    DistributedHerder(DistributedConfig config,
+                      String workerId,
+                      Worker worker,
+                      StatusBackingStore statusBackingStore,
+                      KafkaConfigStorage configStorage,
+                      WorkerGroupMember member,
+                      String restUrl,
+                      Time time) {
+        super(statusBackingStore, workerId);
+
         this.worker = worker;
         if (configStorage != null) {
             // For testing. Assume configuration has already been performed
@@ -131,7 +145,7 @@ public class DistributedHerder implements Herder, Runnable {
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
 
-        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener());
+        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener(), time);
         stopping = new AtomicBoolean(false);
 
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
@@ -146,11 +160,25 @@ public class DistributedHerder implements Herder, Runnable {
         thread.start();
     }
 
+    @Override
+    protected void startServices() {
+        super.startServices();
+        configStorage.start();
+    }
+
+    @Override
+    protected void stopServices() {
+        super.stopServices();
+        if (configStorage != null)
+            configStorage.stop();
+    }
+
+    @Override
     public void run() {
         try {
             log.info("Herder starting");
 
-            configStorage.start();
+            startServices();
 
             log.info("Herder started");
 
@@ -282,13 +310,10 @@ public class DistributedHerder implements Herder, Runnable {
                     log.error("Failed to shut down connector " + connName, t);
                 }
             }
-            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
-                try {
-                    worker.stopTask(taskId);
-                } catch (Throwable t) {
-                    log.error("Failed to shut down task " + taskId, t);
-                }
-            }
+
+            Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
+            worker.stopTasks(tasks);
+            worker.awaitStopTasks(tasks);
 
             member.stop();
 
@@ -299,8 +324,7 @@ public class DistributedHerder implements Herder, Runnable {
                 request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
             }
 
-            if (configStorage != null)
-                configStorage.stop();
+            stopServices();
         }
     }
 
@@ -388,7 +412,7 @@ public class DistributedHerder implements Herder, Runnable {
     }
 
     @Override
-    public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace,
+    public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
                                    final Callback<Created<ConnectorInfo>> callback) {
         final Map<String, String> connConfig;
         if (config == null) {
@@ -515,6 +539,10 @@ public class DistributedHerder implements Herder, Runnable {
         );
     }
 
+    @Override
+    public int generation() {
+        return generation;
+    }
 
     // Should only be called from work thread, so synchronization should not be needed
     private boolean isLeader() {
@@ -649,7 +677,7 @@ public class DistributedHerder implements Herder, Runnable {
                 log.info("Starting task {}", taskId);
                 Map<String, String> configs = configState.taskConfig(taskId);
                 TaskConfig taskConfig = new TaskConfig(configs);
-                worker.addTask(taskId, taskConfig);
+                worker.startTask(taskId, taskConfig, this);
             } catch (ConfigException e) {
                 log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
                         "configuration. This task will not execute until reconfigured.", e);
@@ -666,7 +694,7 @@ public class DistributedHerder implements Herder, Runnable {
         ConnectorConfig connConfig = new ConnectorConfig(configs);
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
-        worker.addConnector(connConfig, ctx);
+        worker.startConnector(connConfig, ctx, this);
 
         // Immediately request configuration since this could be a brand new connector. However, also only update those
         // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
@@ -816,7 +844,7 @@ public class DistributedHerder implements Herder, Runnable {
                     callback.onCompletion(error, null);
             }
         };
-    };
+    }
 
 
     // Config callbacks are triggered from the KafkaConfigStorage thread
@@ -853,11 +881,22 @@ public class DistributedHerder implements Herder, Runnable {
         };
     }
 
+    private void updateDeletedConnectorStatus() {
+        ClusterConfigState snapshot = configStorage.snapshot();
+        Set<String> connectors = snapshot.connectors();
+        for (String connector : statusBackingStore.connectors()) {
+            if (!connectors.contains(connector)) {
+                log.debug("Cleaning status information for connector {}", connector);
+                onDeletion(connector);
+            }
+        }
+    }
+
     // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
     private WorkerRebalanceListener rebalanceListener() {
         return new WorkerRebalanceListener() {
             @Override
-            public void onAssigned(ConnectProtocol.Assignment assignment) {
+            public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
                 // This callback just logs the info and saves it. The actual response is handled in the main loop, which
                 // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
                 // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
@@ -866,8 +905,16 @@ public class DistributedHerder implements Herder, Runnable {
                 log.info("Joined group and got assignment: {}", assignment);
                 synchronized (DistributedHerder.this) {
                     DistributedHerder.this.assignment = assignment;
+                    DistributedHerder.this.generation = generation;
                     rebalanceResolved = false;
                 }
+
+                // Delete the statuses of all connectors removed prior to the start of this reblaance. This has to
+                // be done after the rebalance completes to avoid race conditions as the previous generation attempts
+                // to change the state to UNASSIGNED after tasks have been stopped.
+                if (isLeader())
+                    updateDeletedConnectorStatus();
+
                 // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
                 // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
                 // main thread.
@@ -890,18 +937,24 @@ public class DistributedHerder implements Herder, Runnable {
                     // unnecessary repeated connections to the source/sink system.
                     for (String connectorName : connectors)
                         worker.stopConnector(connectorName);
+
                     // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
                     // stopping them then state could continue to be reused when the task remains on this worker. For example,
                     // this would avoid having to close a connection and then reopen it when the task is assigned back to this
                     // worker again.
-                    for (ConnectorTaskId taskId : tasks)
-                        worker.stopTask(taskId);
+                    if (!tasks.isEmpty()) {
+                        worker.stopTasks(tasks); // trigger stop() for all tasks
+                        worker.awaitStopTasks(tasks); // await stopping tasks
+                    }
 
+                    // Ensure that all status updates have been pushed to the storage system before rebalancing.
+                    // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
+                    // completes.
+                    statusBackingStore.flush();
                     log.info("Finished stopping tasks in preparation for rebalance");
                 } else {
                     log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
                 }
-
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 79199a6..fa50fbf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -112,7 +112,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
         // up to date, try to rejoin again, leaving the group and backing off, etc.).
         rejoinRequested = false;
-        listener.onAssigned(assignmentSnapshot);
+        listener.onAssigned(assignmentSnapshot, generation);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4b24312..9f05040 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
 import org.slf4j.Logger;
@@ -67,9 +66,13 @@ public class WorkerGroupMember {
 
     private boolean stopped = false;
 
-    public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+    public WorkerGroupMember(DistributedConfig config,
+                             String restUrl,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener,
+                             Time time) {
         try {
-            this.time = new SystemTime();
+            this.time = time;
 
             String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
index 40f55d2..bc833e9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -29,7 +29,7 @@ public interface WorkerRebalanceListener {
      * Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful
      * and unsuccessful assignments.
      */
-    void onAssigned(ConnectProtocol.Assignment assignment);
+    void onAssigned(ConnectProtocol.Assignment assignment, int generation);
 
     /**
      * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index a544fb0..dbac58f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -50,6 +50,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +66,6 @@ public class RestServer {
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
     private final WorkerConfig config;
-    private Herder herder;
     private Server jettyServer;
 
     /**
@@ -90,8 +90,6 @@ public class RestServer {
     public void start(Herder herder) {
         log.info("Starting REST server");
 
-        this.herder = herder;
-
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
@@ -151,7 +149,7 @@ public class RestServer {
      * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
      * server, unless overrides for advertised hostname and/or port are provided via configs.
      */
-    public String advertisedUrl() {
+    public URI advertisedUrl() {
         UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
         String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
         if (advertisedHostname != null && !advertisedHostname.isEmpty())
@@ -161,10 +159,9 @@ public class RestServer {
             builder.port(advertisedPort);
         else
             builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
-        return builder.build().toString();
+        return builder.build();
     }
 
-
     /**
      * @param url               HTTP connection will be established with this url.
      * @param method            HTTP method ("GET", "POST", "PUT", etc.)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
new file mode 100644
index 0000000..179c0db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class ConnectorStateInfo {
+
+    private final String name;
+    private final ConnectorState connector;
+    private final List<TaskState> tasks;
+
+    @JsonCreator
+    public ConnectorStateInfo(@JsonProperty("name") String name,
+                              @JsonProperty("connector") ConnectorState connector,
+                              @JsonProperty("tasks") List<TaskState> tasks) {
+        this.name = name;
+        this.connector = connector;
+        this.tasks = tasks;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public ConnectorState connector() {
+        return connector;
+    }
+
+    @JsonProperty
+    public List<TaskState> tasks() {
+        return tasks;
+    }
+
+    public abstract static class AbstractState {
+        private final String state;
+        private final String trace;
+        private final String workerId;
+
+        public AbstractState(String state, String workerId, String trace) {
+            this.state = state;
+            this.workerId = workerId;
+            this.trace = trace;
+        }
+
+        @JsonProperty
+        public String state() {
+            return state;
+        }
+
+        @JsonProperty("worker_id")
+        public String workerId() {
+            return workerId;
+        }
+
+        @JsonProperty
+        @JsonInclude(JsonInclude.Include.NON_EMPTY)
+        public String trace() {
+            return trace;
+        }
+    }
+
+    public static class ConnectorState extends AbstractState {
+        public ConnectorState(String state, String worker, String msg) {
+            super(state, worker, msg);
+        }
+    }
+
+    public static class TaskState extends AbstractState implements Comparable<TaskState> {
+        private final int id;
+
+        public TaskState(int id, String state, String worker, String msg) {
+            super(state, worker, msg);
+            this.id = id;
+        }
+
+        @JsonProperty
+        public int id() {
+            return id;
+        }
+
+        @Override
+        public int compareTo(TaskState that) {
+            return Integer.compare(this.id, that.id);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c95b723..d0d940b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -23,9 +23,11 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,8 +100,7 @@ public class ConnectorsResource {
     public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
-        });
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null);
     }
 
     @GET
@@ -107,8 +108,13 @@ public class ConnectorsResource {
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
-        });
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null);
+    }
+
+    @GET
+    @Path("/{connector}/status")
+    public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
+        return herder.connectorStatus(connector);
     }
 
     @PUT
@@ -145,6 +151,13 @@ public class ConnectorsResource {
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
     }
 
+    @GET
+    @Path("/{connector}/tasks/{task}/status")
+    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector,
+                                                      @PathParam("task") Integer task) throws Throwable {
+        return herder.taskStatus(new ConnectorTaskId(connector, task));
+    }
+
     @DELETE
     @Path("/{connector}")
     public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 89847ab..707470f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -20,13 +20,15 @@ package org.apache.kafka.connect.runtime.standalone;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -38,23 +40,33 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 /**
  * Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
  */
-public class StandaloneHerder implements Herder {
+public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
     private final Worker worker;
     private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
     public StandaloneHerder(Worker worker) {
+        this(worker.workerId(), worker, new MemoryStatusBackingStore());
+    }
+
+    // visible for testing
+    StandaloneHerder(String workerId,
+                     Worker worker,
+                     StatusBackingStore statusBackingStore) {
+        super(statusBackingStore, workerId);
         this.worker = worker;
     }
 
     public synchronized void start() {
         log.info("Herder starting");
+        startServices();
         log.info("Herder started");
     }
 
@@ -78,6 +90,11 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
+    public int generation() {
+        return 0;
+    }
+
+    @Override
     public synchronized void connectors(Callback<Collection<String>> callback) {
         callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
     }
@@ -131,8 +148,10 @@ public class StandaloneHerder implements Herder {
                 if (config == null) // Deletion, kill tasks as well
                     removeConnectorTasks(connName);
                 worker.stopConnector(connName);
-                if (config == null)
+                if (config == null) {
                     connectors.remove(connName);
+                    onDeletion(connName);
+                }
             } else {
                 if (config == null) {
                     // Deletion, must already exist
@@ -194,7 +213,7 @@ public class StandaloneHerder implements Herder {
         ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorState state = connectors.get(connName);
-        worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+        worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this);
         if (state == null) {
             connectors.put(connName, new ConnectorState(connectorProps, connConfig));
         } else {
@@ -219,7 +238,7 @@ public class StandaloneHerder implements Herder {
             ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
             TaskConfig config = new TaskConfig(taskConfigMap);
             try {
-                worker.addTask(taskId, config);
+                worker.startTask(taskId, config, this);
             } catch (Throwable e) {
                 log.error("Failed to add task {}: ", taskId, e);
                 // Swallow this so we can continue updating the rest of the tasks
@@ -230,19 +249,21 @@ public class StandaloneHerder implements Herder {
         }
     }
 
+    private Set<ConnectorTaskId> tasksFor(ConnectorState state) {
+        Set<ConnectorTaskId> tasks = new HashSet<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++)
+            tasks.add(new ConnectorTaskId(state.name, i));
+        return tasks;
+    }
+
     private void removeConnectorTasks(String connName) {
         ConnectorState state = connectors.get(connName);
-        for (int i = 0; i < state.taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
-            try {
-                worker.stopTask(taskId);
-            } catch (ConnectException e) {
-                log.error("Failed to stop task {}: ", taskId, e);
-                // Swallow this so we can continue stopping the rest of the tasks
-                // FIXME: Forcibly kill the task?
-            }
+        Set<ConnectorTaskId> tasks = tasksFor(state);
+        if (!tasks.isEmpty()) {
+            worker.stopTasks(tasks);
+            worker.awaitStopTasks(tasks);
+            state.taskConfigs = new ArrayList<>();
         }
-        state.taskConfigs = new ArrayList<>();
     }
 
     private void updateConnectorTasks(String connName) {


Mime
View raw message