KAFKA-3093: Add Connect status tracking API
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes #920 from hachikuji/KAFKA-3093
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7d019ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7d019ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7d019ed
Branch: refs/heads/trunk
Commit: f7d019ed408fa988129be9af3689bfa4878bc627
Parents: aeb9c2a
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Feb 23 22:47:31 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Feb 23 22:47:31 2016 -0800
----------------------------------------------------------------------
config/connect-distributed.properties | 3 +-
.../connect/connector/ConnectorContext.java | 7 +
.../kafka/connect/file/FileStreamSinkTask.java | 2 +-
.../kafka/connect/cli/ConnectDistributed.java | 28 +-
.../kafka/connect/cli/ConnectStandalone.java | 13 +-
.../kafka/connect/runtime/AbstractHerder.java | 156 +++++++
.../kafka/connect/runtime/AbstractStatus.java | 100 ++++
.../kafka/connect/runtime/ConnectorStatus.java | 58 +++
.../apache/kafka/connect/runtime/Herder.java | 16 +-
.../connect/runtime/HerderConnectorContext.java | 11 +-
.../kafka/connect/runtime/TaskStatus.java | 53 +++
.../apache/kafka/connect/runtime/Worker.java | 210 +++++----
.../kafka/connect/runtime/WorkerSinkTask.java | 4 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 18 +-
.../kafka/connect/runtime/WorkerTask.java | 47 +-
.../runtime/distributed/DistributedHerder.java | 107 +++--
.../runtime/distributed/WorkerCoordinator.java | 2 +-
.../runtime/distributed/WorkerGroupMember.java | 9 +-
.../distributed/WorkerRebalanceListener.java | 2 +-
.../kafka/connect/runtime/rest/RestServer.java | 9 +-
.../rest/entities/ConnectorStateInfo.java | 108 +++++
.../rest/resources/ConnectorsResource.java | 21 +-
.../runtime/standalone/StandaloneHerder.java | 51 +-
.../connect/storage/KafkaConfigStorage.java | 15 +-
.../storage/KafkaOffsetBackingStore.java | 13 +-
.../storage/KafkaStatusBackingStore.java | 461 +++++++++++++++++++
.../storage/MemoryStatusBackingStore.java | 105 +++++
.../connect/storage/StatusBackingStore.java | 100 ++++
.../kafka/connect/util/KafkaBasedLog.java | 24 +-
.../org/apache/kafka/connect/util/Table.java | 65 +++
.../connect/runtime/AbstractHerderTest.java | 116 +++++
.../connect/runtime/WorkerSinkTaskTest.java | 4 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 6 +-
.../connect/runtime/WorkerSourceTaskTest.java | 54 ++-
.../kafka/connect/runtime/WorkerTaskTest.java | 88 +++-
.../kafka/connect/runtime/WorkerTest.java | 161 ++++---
.../distributed/DistributedHerderTest.java | 121 ++++-
.../distributed/WorkerCoordinatorTest.java | 2 +-
.../standalone/StandaloneHerderTest.java | 42 +-
.../storage/KafkaStatusBackingStoreTest.java | 373 +++++++++++++++
.../storage/MemoryStatusBackingStoreTest.java | 66 +++
.../apache/kafka/connect/util/TableTest.java | 48 ++
tests/kafkatest/services/connect.py | 4 +-
.../kafkatest/tests/connect_distributed_test.py | 1 +
tests/kafkatest/tests/connect_rest_test.py | 4 +-
.../templates/connect-distributed.properties | 1 +
46 files changed, 2611 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index 9ec63db..46bd3bc 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -39,4 +39,5 @@ internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
-config.storage.topic=connect-configs
\ No newline at end of file
+config.storage.topic=connect-configs
+status.storage.topic=connect-status
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
index 2a06484..c8a06e8 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
@@ -30,4 +30,11 @@ public interface ConnectorContext {
* added/removed) and the running Tasks will need to be modified.
*/
void requestTaskReconfiguration();
+
+ /**
+ * Raise an unrecoverable exception to the Connect framework. This will cause the status of the
+ * connector to transition to FAILED.
+ * @param e Exception to be raised.
+ */
+ void raiseError(Exception e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 83ba6d4..09d4ed8 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -84,7 +84,7 @@ public class FileStreamSinkTask extends SinkTask {
@Override
public void stop() {
- if (outputStream != System.out)
+ if (outputStream != null && outputStream != System.out)
outputStream.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 5ad032e..bc5b75a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,6 +18,8 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Worker;
@@ -25,9 +27,12 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
import java.util.Collections;
import java.util.Map;
@@ -54,12 +59,29 @@ public class ConnectDistributed {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ Time time = new SystemTime();
DistributedConfig config = new DistributedConfig(workerProps);
- Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+
RestServer rest = new RestServer(config);
- DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+ offsetBackingStore.configure(config.originals());
+
+ Worker worker = new Worker(workerId, time, config, offsetBackingStore);
+
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+ statusBackingStore.configure(config.originals());
+
+ DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
final Connect connect = new Connect(worker, herder, rest);
- connect.start();
+ try {
+ connect.start();
+ } catch (Exception e) {
+ log.error("Failed to start Connect", e);
+ connect.stop();
+ }
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index f89a72a..6c4335e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,9 +18,11 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -33,6 +35,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -63,9 +66,15 @@ public class ConnectStandalone {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ Time time = new SystemTime();
StandaloneConfig config = new StandaloneConfig(workerProps);
- Worker worker = new Worker(config, new FileOffsetBackingStore());
+
RestServer rest = new RestServer(config);
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+ Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
+
Herder herder = new StandaloneHerder(worker);
final Connect connect = new Connect(worker, herder, rest);
connect.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
new file mode 100644
index 0000000..ca85d87
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
+ * must invoke the lifecycle hooks appropriately.
+ *
+ * This class takes the following approach for sending status updates to the backing store:
+ *
+ * 1) When the connector or task is starting, we overwrite the previous state blindly. This ensures that
+ * every rebalance will reset the state of tasks to the proper state. The intuition is that there should
+ * be less chance of write conflicts when the worker has just received its assignment and is starting tasks.
+ * In particular, this prevents us from depending on the generation absolutely. If the group disappears
+ * and the generation is reset, then we'll overwrite the status information with the older (and larger)
+ * generation with the updated one. The danger of this approach is that slow starting tasks may cause the
+ * status to be overwritten after a rebalance has completed.
+ *
+ * 2) If the connector or task fails or is shutdown, we use {@link StatusBackingStore#putSafe(ConnectorStatus)},
+ * which provides a little more protection if the worker is no longer in the group (in which case the
+ * task may have already been started on another worker). Obviously this is still racy. If the task has just
+ * started on another worker, we may not have the updated status cached yet. In this case, we'll overwrite
+ * the value which will cause the state to be inconsistent (most likely until the next rebalance). Until
+ * we have proper producer groups with fenced groups, there is not much else we can do.
+ */
+public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
+
+ protected final StatusBackingStore statusBackingStore;
+ private final String workerId;
+
+ public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+ this.statusBackingStore = statusBackingStore;
+ this.workerId = workerId;
+ }
+
+ protected abstract int generation();
+
+ protected void startServices() {
+ this.statusBackingStore.start();
+ }
+
+ protected void stopServices() {
+ this.statusBackingStore.stop();
+ }
+
+ @Override
+ public void onStartup(String connector) {
+ statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
+ workerId, generation()));
+ }
+
+ @Override
+ public void onShutdown(String connector) {
+ statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
+ workerId, generation()));
+ }
+
+ @Override
+ public void onFailure(String connector, Throwable cause) {
+ statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
+ trace(cause), workerId, generation()));
+ }
+
+ @Override
+ public void onStartup(ConnectorTaskId id) {
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
+ }
+
+ @Override
+ public void onFailure(ConnectorTaskId id, Throwable cause) {
+ statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
+ }
+
+ @Override
+ public void onShutdown(ConnectorTaskId id) {
+ statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
+ }
+
+ @Override
+ public void onDeletion(String connector) {
+ for (TaskStatus status : statusBackingStore.getAll(connector))
+ statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+ statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
+ }
+
+ @Override
+ public ConnectorStateInfo connectorStatus(String connName) {
+ ConnectorStatus connector = statusBackingStore.get(connName);
+ if (connector == null)
+ throw new NotFoundException("No status found for connector " + connName);
+
+ Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
+
+ ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
+ connector.state().toString(), connector.workerId(), connector.trace());
+ List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
+
+ for (TaskStatus status : tasks) {
+ taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
+ status.state().toString(), status.workerId(), status.trace()));
+ }
+
+ Collections.sort(taskStates);
+
+ return new ConnectorStateInfo(connName, connectorState, taskStates);
+ }
+
+ @Override
+ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
+ TaskStatus status = statusBackingStore.get(id);
+
+ if (status == null)
+ throw new NotFoundException("No status found for task " + id);
+
+ return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
+ status.workerId(), status.trace());
+ }
+
+ private String trace(Throwable t) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ t.printStackTrace(new PrintStream(output));
+ try {
+ return output.toString("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
new file mode 100644
index 0000000..4f31be1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public abstract class AbstractStatus<T> {
+
+ public enum State {
+ UNASSIGNED,
+ RUNNING,
+ FAILED,
+ DESTROYED,
+ }
+
+ private final T id;
+ private final State state;
+ private final String trace;
+ private final String workerId;
+ private final int generation;
+
+ public AbstractStatus(T id,
+ State state,
+ String workerId,
+ int generation,
+ String trace) {
+ this.id = id;
+ this.state = state;
+ this.workerId = workerId;
+ this.generation = generation;
+ this.trace = trace;
+ }
+
+ public T id() {
+ return id;
+ }
+
+ public State state() {
+ return state;
+ }
+
+ public String trace() {
+ return trace;
+ }
+
+ public String workerId() {
+ return workerId;
+ }
+
+ public int generation() {
+ return generation;
+ }
+
+ @Override
+ public String toString() {
+ return "Status{" +
+ "id=" + id +
+ ", state=" + state +
+ ", workerId='" + workerId + '\'' +
+ ", generation=" + generation +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractStatus<?> that = (AbstractStatus<?>) o;
+
+ if (generation != that.generation) return false;
+ if (id != null ? !id.equals(that.id) : that.id != null) return false;
+ if (state != that.state) return false;
+ if (trace != null ? !trace.equals(that.trace) : that.trace != null) return false;
+ return workerId != null ? workerId.equals(that.workerId) : that.workerId == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (state != null ? state.hashCode() : 0);
+ result = 31 * result + (trace != null ? trace.hashCode() : 0);
+ result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+ result = 31 * result + generation;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
new file mode 100644
index 0000000..d9a2eba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public class ConnectorStatus extends AbstractStatus<String> {
+
+ public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) {
+ super(connector, state, workerUrl, generation, msg);
+ }
+
+ public ConnectorStatus(String connector, State state, String workerUrl, int generation) {
+ super(connector, state, workerUrl, generation, null);
+ }
+
+ public interface Listener {
+
+ /**
+ * Invoked after connector has successfully been shutdown.
+ * @param connector The connector name
+ */
+ void onShutdown(String connector);
+
+ /**
+ * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}.
+ * Note that no shutdown event will follow after the task has been failed.
+ * @param connector The connector name
+ * @param cause Error raised from the connector.
+ */
+ void onFailure(String connector, Throwable cause);
+
+ /**
+ * Invoked after successful startup of the connector.
+ * @param connector The connector name
+ */
+ void onStartup(String connector);
+
+ /**
+ * Invoked when the connector is deleted through the REST API.
+ * @param connector The connector name
+ */
+ void onDeletion(String connector);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index fc0689c..95c7700 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -18,8 +18,10 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.Collection;
import java.util.List;
@@ -61,7 +63,7 @@ public interface Herder {
* @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request
* (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
* also not the leader
- * @throws ConnectException if this node is the leader, but still cannot resolve the
+ * @throws org.apache.kafka.connect.errors.ConnectException if this node is the leader, but still cannot resolve the
* request (e.g., it is not in sync with other worker's config state)
*/
void connectors(Callback<Collection<String>> callback);
@@ -113,6 +115,18 @@ public interface Herder {
*/
void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
+ /**
+ * Lookup the current status of a connector.
+ * @param connName name of the connector
+ */
+ ConnectorStateInfo connectorStatus(String connName);
+
+ /**
+ * Lookup the status of the a task.
+ * @param id id of the task
+ */
+ ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
+
class Created<T> {
private final boolean created;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
index e3294ef..bd933f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -24,10 +24,10 @@ import org.apache.kafka.connect.connector.ConnectorContext;
*/
public class HerderConnectorContext implements ConnectorContext {
- private Herder herder;
- private String connectorName;
+ private final AbstractHerder herder;
+ private final String connectorName;
- public HerderConnectorContext(Herder herder, String connectorName) {
+ public HerderConnectorContext(AbstractHerder herder, String connectorName) {
this.herder = herder;
this.connectorName = connectorName;
}
@@ -38,4 +38,9 @@ public class HerderConnectorContext implements ConnectorContext {
// Distributed herder will forward the request to the leader if needed
herder.requestTaskReconfiguration(connectorName);
}
+
+ @Override
+ public void raiseError(Exception e) {
+ herder.onFailure(connectorName, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
new file mode 100644
index 0000000..3542eb8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
+
+ public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) {
+ super(id, state, workerUrl, generation, trace);
+ }
+
+ public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation) {
+ super(id, state, workerUrl, generation, null);
+ }
+
+ public interface Listener {
+
+ /**
+ * Invoked after successful startup of the task.
+ * @param id The id of the task
+ */
+ void onStartup(ConnectorTaskId id);
+
+ /**
+ * Invoked if the task raises an error. No shutdown event will follow.
+ * @param id The id of the task
+ * @param cause The error raised by the task.
+ */
+ void onFailure(ConnectorTaskId id, Throwable cause);
+
+ /**
+ * Invoked after successful shutdown of the task.
+ * @param id The id of the task
+ */
+ void onShutdown(ConnectorTaskId id);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0a4bb7f..8e74fec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@@ -66,25 +65,25 @@ public class Worker {
private final ExecutorService executor;
private final Time time;
+ private final String workerId;
private final WorkerConfig config;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ private final Converter internalKeyConverter;
+ private final Converter internalValueConverter;
+ private final OffsetBackingStore offsetBackingStore;
- private Converter keyConverter;
- private Converter valueConverter;
- private Converter internalKeyConverter;
- private Converter internalValueConverter;
- private OffsetBackingStore offsetBackingStore;
- private HashMap<String, Connector> connectors = new HashMap<>();
+ private HashMap<String, WorkerConnector> connectors = new HashMap<>();
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
- public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
- this(new SystemTime(), config, offsetBackingStore);
- }
-
- @SuppressWarnings("unchecked")
- public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+ public Worker(String workerId,
+ Time time,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore) {
this.executor = Executors.newCachedThreadPool();
+ this.workerId = workerId;
this.time = time;
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
@@ -132,34 +131,18 @@ public class Worker {
long started = time.milliseconds();
long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
- for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
- Connector conn = entry.getValue();
+ for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
+ WorkerConnector conn = entry.getValue();
log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
"Worker is stopped.", conn);
- try {
- conn.stop();
- } catch (ConnectException e) {
- log.error("Error while shutting down connector " + conn, e);
- }
+ conn.stop();
}
- for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
- WorkerTask task = entry.getValue();
- log.warn("Shutting down task {} uncleanly; herder should have shut down "
- + "tasks before the Worker is stopped.", task);
- try {
- task.stop();
- } catch (ConnectException e) {
- log.error("Error while shutting down task " + task, e);
- }
- }
-
- for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
- WorkerTask task = entry.getValue();
- log.debug("Waiting for task {} to finish shutting down", task);
- if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
- log.error("Graceful shutdown of task {} failed.", task);
- }
+ Collection<ConnectorTaskId> taskIds = tasks.keySet();
+ log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
+ + "tasks before the Worker is stopped.", taskIds);
+ stopTasks(taskIds);
+ awaitStopTasks(taskIds);
long timeoutMs = limit - time.milliseconds();
sourceTaskOffsetCommitter.close(timeoutMs);
@@ -169,16 +152,12 @@ public class Worker {
log.info("Worker stopped");
}
- public WorkerConfig config() {
- return config;
- }
-
/**
* Add a new connector.
* @param connConfig connector configuration
* @param ctx context for the connector
*/
- public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+ public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
@@ -188,22 +167,25 @@ public class Worker {
throw new ConnectException("Connector with name " + connName + " already exists");
final Connector connector = instantiateConnector(connClass);
+ WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
- connector.initialize(ctx);
+ workerConnector.initialize();
try {
- connector.start(connConfig.originalsStrings());
+ workerConnector.start(connConfig.originalsStrings());
} catch (ConnectException e) {
throw new ConnectException("Connector threw an exception while starting", e);
}
- connectors.put(connName, connector);
+ connectors.put(connName, workerConnector);
log.info("Finished creating connector {}", connName);
}
/* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
public boolean isSinkConnector(String connName) {
- return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
+ WorkerConnector workerConnector = connectors.get(connName);
+ return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
}
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
@@ -267,10 +249,11 @@ public class Worker {
public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
log.trace("Reconfiguring connector tasks for {}", connName);
- Connector connector = connectors.get(connName);
- if (connector == null)
+ WorkerConnector workerConnector = connectors.get(connName);
+ if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
+ Connector connector = workerConnector.delegate;
List<Map<String, String>> result = new ArrayList<>();
String taskClassName = connector.taskClass().getName();
for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
@@ -286,16 +269,11 @@ public class Worker {
public void stopConnector(String connName) {
log.info("Stopping connector {}", connName);
- Connector connector = connectors.get(connName);
+ WorkerConnector connector = connectors.get(connName);
if (connector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
- try {
- connector.stop();
- } catch (ConnectException e) {
- log.error("Error shutting down connector {}: ", connector, e);
- }
-
+ connector.stop();
connectors.remove(connName);
log.info("Stopped connector {}", connName);
@@ -313,7 +291,7 @@ public class Worker {
* @param id Globally unique ID for this task.
* @param taskConfig the parsed task configuration
*/
- public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+ public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
log.info("Creating task {}", id);
if (tasks.containsKey(id)) {
@@ -327,33 +305,35 @@ public class Worker {
final Task task = instantiateTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+ final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+
+ // Start the task before adding modifying any state, any exceptions are caught higher up the
+ // call chain and there's no cleanup to do here
+ workerTask.initialize(taskConfig.originalsStrings());
+ executor.submit(workerTask);
+
+ if (task instanceof SourceTask) {
+ WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+ sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+ }
+ tasks.put(id, workerTask);
+ }
+
+ private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener lifecycleListener) {
// Decide which type of worker task we need based on the type of task.
- final WorkerTask workerTask;
if (task instanceof SourceTask) {
- SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
- workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
+ return new WorkerSourceTask(id, (SourceTask) task, lifecycleListener, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
- workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+ return new WorkerSinkTask(id, (SinkTask) task, lifecycleListener, config, keyConverter, valueConverter, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
}
-
- // Start the task before adding modifying any state, any exceptions are caught higher up the
- // call chain and there's no cleanup to do here
- workerTask.initialize(taskConfig.originalsStrings());
- executor.submit(workerTask);
-
- if (task instanceof SourceTask) {
- WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
- sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
- }
- tasks.put(id, workerTask);
}
private static Task instantiateTask(Class<? extends Task> taskClass) {
@@ -364,16 +344,39 @@ public class Worker {
}
}
- public void stopTask(ConnectorTaskId id) {
- log.info("Stopping task {}", id);
+ public void stopTasks(Collection<ConnectorTaskId> ids) {
+ for (ConnectorTaskId id : ids)
+ stopTask(getTask(id));
+ }
- WorkerTask task = getTask(id);
+ public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
+ long now = time.milliseconds();
+ long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+ for (ConnectorTaskId id : ids) {
+ long remaining = Math.max(0, deadline - time.milliseconds());
+ awaitStopTask(getTask(id), remaining);
+ }
+ }
+
+ private void awaitStopTask(WorkerTask task, long timeout) {
+ if (!task.awaitStop(timeout)) {
+ log.error("Graceful stop of task {} failed.", task.id());
+ task.cancel();
+ }
+ tasks.remove(task.id());
+ }
+
+ private void stopTask(WorkerTask task) {
+ log.info("Stopping task {}", task.id());
if (task instanceof WorkerSourceTask)
- sourceTaskOffsetCommitter.remove(id);
+ sourceTaskOffsetCommitter.remove(task.id());
task.stop();
- if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
- log.error("Graceful stop of task {} failed.", task);
- tasks.remove(id);
+ }
+
+ public void stopAndAwaitTask(ConnectorTaskId id) {
+ WorkerTask task = getTask(id);
+ stopTask(task);
+ awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
}
/**
@@ -400,4 +403,55 @@ public class Worker {
return internalValueConverter;
}
+ public String workerId() {
+ return workerId;
+ }
+
+ private static class WorkerConnector {
+ private final String connName;
+ private final ConnectorStatus.Listener lifecycleListener;
+ private final ConnectorContext ctx;
+ private final Connector delegate;
+
+ public WorkerConnector(String connName,
+ Connector delegate,
+ ConnectorContext ctx,
+ ConnectorStatus.Listener lifecycleListener) {
+ this.connName = connName;
+ this.ctx = ctx;
+ this.delegate = delegate;
+ this.lifecycleListener = lifecycleListener;
+ }
+
+ public void initialize() {
+ delegate.initialize(ctx);
+ }
+
+ public void start(Map<String, String> props) {
+ try {
+ delegate.start(props);
+ lifecycleListener.onStartup(connName);
+ } catch (Throwable t) {
+ log.error("Error while starting connector {}", connName, t);
+ lifecycleListener.onFailure(connName, t);
+ }
+ }
+
+ public void stop() {
+ try {
+ delegate.stop();
+ lifecycleListener.onShutdown(connName);
+ } catch (Throwable t) {
+ log.error("Error while shutting down connector {}", connName, t);
+ lifecycleListener.onFailure(connName, t);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8c5bd9f..eb64355 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -76,11 +76,12 @@ class WorkerSinkTask extends WorkerTask {
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
+ TaskStatus.Listener lifecycleListener,
WorkerConfig workerConfig,
Converter keyConverter,
Converter valueConverter,
Time time) {
- super(id);
+ super(id, lifecycleListener);
this.workerConfig = workerConfig;
this.task = task;
@@ -184,6 +185,7 @@ class WorkerSinkTask extends WorkerTask {
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
+ log.debug("Initializing task {} with config {}", id, taskConfig);
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new ConnectException("Sink tasks require a list of topics.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 562e03e..8542f4c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -17,13 +17,13 @@
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -76,6 +76,7 @@ class WorkerSourceTask extends WorkerTask {
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
+ TaskStatus.Listener lifecycleListener,
Converter keyConverter,
Converter valueConverter,
KafkaProducer<byte[], byte[]> producer,
@@ -83,7 +84,7 @@ class WorkerSourceTask extends WorkerTask {
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
Time time) {
- super(id);
+ super(id, lifecycleListener);
this.workerConfig = workerConfig;
this.task = task;
@@ -147,16 +148,13 @@ class WorkerSourceTask extends WorkerTask {
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
- } catch (Throwable t) {
- log.error("Task {} threw an uncaught and unrecoverable exception", id);
- log.error("Task is being killed and will not recover until manually restarted:", t);
- // It should still be safe to let this fall through and commit offsets since this exception would have
+ } finally {
+ // It should still be safe to commit offsets since any exception would have
// simply resulted in not getting more records but all the existing records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
// to fail.
+ commitOffsets();
}
-
- commitOffsets();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ecaeb7b..cc69c0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -37,13 +37,21 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final AtomicBoolean stopping;
private final AtomicBoolean running;
+ private final AtomicBoolean cancelled;
private final CountDownLatch shutdownLatch;
+ private final TaskStatus.Listener lifecycleListener;
- public WorkerTask(ConnectorTaskId id) {
+ public WorkerTask(ConnectorTaskId id, TaskStatus.Listener lifecycleListener) {
this.id = id;
this.stopping = new AtomicBoolean(false);
this.running = new AtomicBoolean(false);
+ this.cancelled = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
+ this.lifecycleListener = lifecycleListener;
+ }
+
+ public ConnectorTaskId id() {
+ return id;
}
/**
@@ -61,9 +69,17 @@ abstract class WorkerTask implements Runnable {
}
/**
+ * Cancel this task. This won't actually stop it, but it will prevent the state from being
+ * updated when it eventually does shutdown.
+ */
+ public void cancel() {
+ this.cancelled.set(true);
+ }
+
+ /**
* Wait for this task to finish stopping.
*
- * @param timeoutMs
+ * @param timeoutMs time in milliseconds to await stop
* @return true if successful, false if the timeout was reached
*/
public boolean awaitStop(long timeoutMs) {
@@ -85,19 +101,23 @@ abstract class WorkerTask implements Runnable {
return stopping.get();
}
+ protected boolean isStopped() {
+ return !running.get();
+ }
+
private void doClose() {
try {
close();
} catch (Throwable t) {
- log.error("Unhandled exception in task shutdown {}", id, t);
+ log.error("Task {} threw an uncaught and unrecoverable exception during shutdown", id, t);
+ throw t;
} finally {
running.set(false);
shutdownLatch.countDown();
}
}
- @Override
- public void run() {
+ private void doRun() {
if (!this.running.compareAndSet(false, true))
throw new IllegalStateException("The task cannot be started while still running");
@@ -105,12 +125,27 @@ abstract class WorkerTask implements Runnable {
if (stopping.get())
return;
+ lifecycleListener.onStartup(id);
execute();
} catch (Throwable t) {
- log.error("Unhandled exception in task {}", id, t);
+ log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
+ log.error("Task is being killed and will not recover until manually restarted");
+ throw t;
} finally {
doClose();
}
}
+ @Override
+ public void run() {
+ try {
+ doRun();
+ if (!cancelled.get())
+ lifecycleListener.onShutdown(id);
+ } catch (Throwable t) {
+ if (!cancelled.get())
+ lifecycleListener.onFailure(id, t);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8b0525b..83ed714 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,17 +17,16 @@
package org.apache.kafka.connect.runtime.distributed;
-import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
@@ -35,6 +34,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -79,7 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* (and therefore, also for creating, destroy, and scaling up/down connectors).
* </p>
*/
-public class DistributedHerder implements Herder, Runnable {
+public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
@@ -108,15 +108,29 @@ public class DistributedHerder implements Herder, Runnable {
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
private boolean needsReconfigRebalance;
+ private volatile int generation;
private final ExecutorService forwardRequestExecutor;
- public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
- this(config, worker, null, null, restUrl, new SystemTime());
+ public DistributedHerder(DistributedConfig config,
+ Time time,
+ Worker worker,
+ StatusBackingStore statusBackingStore,
+ String restUrl) {
+ this(config, worker.workerId(), worker, statusBackingStore, null, null, restUrl, time);
}
- // public for testing
- public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) {
+ // visible for testing
+ DistributedHerder(DistributedConfig config,
+ String workerId,
+ Worker worker,
+ StatusBackingStore statusBackingStore,
+ KafkaConfigStorage configStorage,
+ WorkerGroupMember member,
+ String restUrl,
+ Time time) {
+ super(statusBackingStore, workerId);
+
this.worker = worker;
if (configStorage != null) {
// For testing. Assume configuration has already been performed
@@ -131,7 +145,7 @@ public class DistributedHerder implements Herder, Runnable {
this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
- this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener());
+ this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener(), time);
stopping = new AtomicBoolean(false);
rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
@@ -146,11 +160,25 @@ public class DistributedHerder implements Herder, Runnable {
thread.start();
}
+ @Override
+ protected void startServices() {
+ super.startServices();
+ configStorage.start();
+ }
+
+ @Override
+ protected void stopServices() {
+ super.stopServices();
+ if (configStorage != null)
+ configStorage.stop();
+ }
+
+ @Override
public void run() {
try {
log.info("Herder starting");
- configStorage.start();
+ startServices();
log.info("Herder started");
@@ -282,13 +310,10 @@ public class DistributedHerder implements Herder, Runnable {
log.error("Failed to shut down connector " + connName, t);
}
}
- for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
- try {
- worker.stopTask(taskId);
- } catch (Throwable t) {
- log.error("Failed to shut down task " + taskId, t);
- }
- }
+
+ Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
+ worker.stopTasks(tasks);
+ worker.awaitStopTasks(tasks);
member.stop();
@@ -299,8 +324,7 @@ public class DistributedHerder implements Herder, Runnable {
request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
}
- if (configStorage != null)
- configStorage.stop();
+ stopServices();
}
}
@@ -388,7 +412,7 @@ public class DistributedHerder implements Herder, Runnable {
}
@Override
- public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace,
+ public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
final Map<String, String> connConfig;
if (config == null) {
@@ -515,6 +539,10 @@ public class DistributedHerder implements Herder, Runnable {
);
}
+ @Override
+ public int generation() {
+ return generation;
+ }
// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
@@ -649,7 +677,7 @@ public class DistributedHerder implements Herder, Runnable {
log.info("Starting task {}", taskId);
Map<String, String> configs = configState.taskConfig(taskId);
TaskConfig taskConfig = new TaskConfig(configs);
- worker.addTask(taskId, taskConfig);
+ worker.startTask(taskId, taskConfig, this);
} catch (ConfigException e) {
log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
"configuration. This task will not execute until reconfigured.", e);
@@ -666,7 +694,7 @@ public class DistributedHerder implements Herder, Runnable {
ConnectorConfig connConfig = new ConnectorConfig(configs);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
- worker.addConnector(connConfig, ctx);
+ worker.startConnector(connConfig, ctx, this);
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
@@ -816,7 +844,7 @@ public class DistributedHerder implements Herder, Runnable {
callback.onCompletion(error, null);
}
};
- };
+ }
// Config callbacks are triggered from the KafkaConfigStorage thread
@@ -853,11 +881,22 @@ public class DistributedHerder implements Herder, Runnable {
};
}
+ private void updateDeletedConnectorStatus() {
+ ClusterConfigState snapshot = configStorage.snapshot();
+ Set<String> connectors = snapshot.connectors();
+ for (String connector : statusBackingStore.connectors()) {
+ if (!connectors.contains(connector)) {
+ log.debug("Cleaning status information for connector {}", connector);
+ onDeletion(connector);
+ }
+ }
+ }
+
// Rebalances are triggered internally from the group member, so these are always executed in the work thread.
private WorkerRebalanceListener rebalanceListener() {
return new WorkerRebalanceListener() {
@Override
- public void onAssigned(ConnectProtocol.Assignment assignment) {
+ public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
// This callback just logs the info and saves it. The actual response is handled in the main loop, which
// ensures the group member's logic for rebalancing can complete, potentially long-running steps to
// catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
@@ -866,8 +905,16 @@ public class DistributedHerder implements Herder, Runnable {
log.info("Joined group and got assignment: {}", assignment);
synchronized (DistributedHerder.this) {
DistributedHerder.this.assignment = assignment;
+ DistributedHerder.this.generation = generation;
rebalanceResolved = false;
}
+
+ // Delete the statuses of all connectors removed prior to the start of this reblaance. This has to
+ // be done after the rebalance completes to avoid race conditions as the previous generation attempts
+ // to change the state to UNASSIGNED after tasks have been stopped.
+ if (isLeader())
+ updateDeletedConnectorStatus();
+
// We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
// sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
// main thread.
@@ -890,18 +937,24 @@ public class DistributedHerder implements Herder, Runnable {
// unnecessary repeated connections to the source/sink system.
for (String connectorName : connectors)
worker.stopConnector(connectorName);
+
// TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
// stopping them then state could continue to be reused when the task remains on this worker. For example,
// this would avoid having to close a connection and then reopen it when the task is assigned back to this
// worker again.
- for (ConnectorTaskId taskId : tasks)
- worker.stopTask(taskId);
+ if (!tasks.isEmpty()) {
+ worker.stopTasks(tasks); // trigger stop() for all tasks
+ worker.awaitStopTasks(tasks); // await stopping tasks
+ }
+ // Ensure that all status updates have been pushed to the storage system before rebalancing.
+ // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
+ // completes.
+ statusBackingStore.flush();
log.info("Finished stopping tasks in preparation for rebalance");
} else {
log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
}
-
}
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 79199a6..fa50fbf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -112,7 +112,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
// tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
// up to date, try to rejoin again, leaving the group and backing off, etc.).
rejoinRequested = false;
- listener.onAssigned(assignmentSnapshot);
+ listener.onAssigned(assignmentSnapshot, generation);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4b24312..9f05040 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
import org.slf4j.Logger;
@@ -67,9 +66,13 @@ public class WorkerGroupMember {
private boolean stopped = false;
- public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+ public WorkerGroupMember(DistributedConfig config,
+ String restUrl,
+ KafkaConfigStorage configStorage,
+ WorkerRebalanceListener listener,
+ Time time) {
try {
- this.time = new SystemTime();
+ this.time = time;
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
index 40f55d2..bc833e9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -29,7 +29,7 @@ public interface WorkerRebalanceListener {
* Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful
* and unsuccessful assignments.
*/
- void onAssigned(ConnectProtocol.Assignment assignment);
+ void onAssigned(ConnectProtocol.Assignment assignment, int generation);
/**
* Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index a544fb0..dbac58f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -50,6 +50,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
+import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
@@ -65,7 +66,6 @@ public class RestServer {
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
private final WorkerConfig config;
- private Herder herder;
private Server jettyServer;
/**
@@ -90,8 +90,6 @@ public class RestServer {
public void start(Herder herder) {
log.info("Starting REST server");
- this.herder = herder;
-
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
@@ -151,7 +149,7 @@ public class RestServer {
* Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
* server, unless overrides for advertised hostname and/or port are provided via configs.
*/
- public String advertisedUrl() {
+ public URI advertisedUrl() {
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
if (advertisedHostname != null && !advertisedHostname.isEmpty())
@@ -161,10 +159,9 @@ public class RestServer {
builder.port(advertisedPort);
else
builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
- return builder.build().toString();
+ return builder.build();
}
-
/**
* @param url HTTP connection will be established with this url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
new file mode 100644
index 0000000..179c0db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class ConnectorStateInfo {
+
+ private final String name;
+ private final ConnectorState connector;
+ private final List<TaskState> tasks;
+
+ @JsonCreator
+ public ConnectorStateInfo(@JsonProperty("name") String name,
+ @JsonProperty("connector") ConnectorState connector,
+ @JsonProperty("tasks") List<TaskState> tasks) {
+ this.name = name;
+ this.connector = connector;
+ this.tasks = tasks;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public ConnectorState connector() {
+ return connector;
+ }
+
+ @JsonProperty
+ public List<TaskState> tasks() {
+ return tasks;
+ }
+
+ public abstract static class AbstractState {
+ private final String state;
+ private final String trace;
+ private final String workerId;
+
+ public AbstractState(String state, String workerId, String trace) {
+ this.state = state;
+ this.workerId = workerId;
+ this.trace = trace;
+ }
+
+ @JsonProperty
+ public String state() {
+ return state;
+ }
+
+ @JsonProperty("worker_id")
+ public String workerId() {
+ return workerId;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public String trace() {
+ return trace;
+ }
+ }
+
+ public static class ConnectorState extends AbstractState {
+ public ConnectorState(String state, String worker, String msg) {
+ super(state, worker, msg);
+ }
+ }
+
+ public static class TaskState extends AbstractState implements Comparable<TaskState> {
+ private final int id;
+
+ public TaskState(int id, String state, String worker, String msg) {
+ super(state, worker, msg);
+ this.id = id;
+ }
+
+ @JsonProperty
+ public int id() {
+ return id;
+ }
+
+ @Override
+ public int compareTo(TaskState that) {
+ return Integer.compare(this.id, that.id);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c95b723..d0d940b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -23,9 +23,11 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,8 +100,7 @@ public class ConnectorsResource {
public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
- });
+ return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null);
}
@GET
@@ -107,8 +108,13 @@ public class ConnectorsResource {
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
- });
+ return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null);
+ }
+
+ @GET
+ @Path("/{connector}/status")
+ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
+ return herder.connectorStatus(connector);
}
@PUT
@@ -145,6 +151,13 @@ public class ConnectorsResource {
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
}
+ @GET
+ @Path("/{connector}/tasks/{task}/status")
+ public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector,
+ @PathParam("task") Integer task) throws Throwable {
+ return herder.taskStatus(new ConnectorTaskId(connector, task));
+ }
+
@DELETE
@Path("/{connector}")
public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 89847ab..707470f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -20,13 +20,15 @@ package org.apache.kafka.connect.runtime.standalone;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -38,23 +40,33 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
*/
-public class StandaloneHerder implements Herder {
+public class StandaloneHerder extends AbstractHerder {
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
private final Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneHerder(Worker worker) {
+ this(worker.workerId(), worker, new MemoryStatusBackingStore());
+ }
+
+ // visible for testing
+ StandaloneHerder(String workerId,
+ Worker worker,
+ StatusBackingStore statusBackingStore) {
+ super(statusBackingStore, workerId);
this.worker = worker;
}
public synchronized void start() {
log.info("Herder starting");
+ startServices();
log.info("Herder started");
}
@@ -78,6 +90,11 @@ public class StandaloneHerder implements Herder {
}
@Override
+ public int generation() {
+ return 0;
+ }
+
+ @Override
public synchronized void connectors(Callback<Collection<String>> callback) {
callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
}
@@ -131,8 +148,10 @@ public class StandaloneHerder implements Herder {
if (config == null) // Deletion, kill tasks as well
removeConnectorTasks(connName);
worker.stopConnector(connName);
- if (config == null)
+ if (config == null) {
connectors.remove(connName);
+ onDeletion(connName);
+ }
} else {
if (config == null) {
// Deletion, must already exist
@@ -194,7 +213,7 @@ public class StandaloneHerder implements Herder {
ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorState state = connectors.get(connName);
- worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+ worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this);
if (state == null) {
connectors.put(connName, new ConnectorState(connectorProps, connConfig));
} else {
@@ -219,7 +238,7 @@ public class StandaloneHerder implements Herder {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
TaskConfig config = new TaskConfig(taskConfigMap);
try {
- worker.addTask(taskId, config);
+ worker.startTask(taskId, config, this);
} catch (Throwable e) {
log.error("Failed to add task {}: ", taskId, e);
// Swallow this so we can continue updating the rest of the tasks
@@ -230,19 +249,21 @@ public class StandaloneHerder implements Herder {
}
}
+ private Set<ConnectorTaskId> tasksFor(ConnectorState state) {
+ Set<ConnectorTaskId> tasks = new HashSet<>();
+ for (int i = 0; i < state.taskConfigs.size(); i++)
+ tasks.add(new ConnectorTaskId(state.name, i));
+ return tasks;
+ }
+
private void removeConnectorTasks(String connName) {
ConnectorState state = connectors.get(connName);
- for (int i = 0; i < state.taskConfigs.size(); i++) {
- ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
- try {
- worker.stopTask(taskId);
- } catch (ConnectException e) {
- log.error("Failed to stop task {}: ", taskId, e);
- // Swallow this so we can continue stopping the rest of the tasks
- // FIXME: Forcibly kill the task?
- }
+ Set<ConnectorTaskId> tasks = tasksFor(state);
+ if (!tasks.isEmpty()) {
+ worker.stopTasks(tasks);
+ worker.awaitStopTasks(tasks);
+ state.taskConfigs = new ArrayList<>();
}
- state.taskConfigs = new ArrayList<>();
}
private void updateConnectorTasks(String connName) {
|