kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/3] kafka git commit: KAFKA-2371: Add distributed support for Copycat.
Date Fri, 23 Oct 2015 23:37:41 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
new file mode 100644
index 0000000..bd2ba56
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedHerderConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that identifies the Copycat cluster group this worker belongs to.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+
+    /**
+     * <code>worker.sync.timeout.ms</code>
+     */
+    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
+    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs" +
+            " to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and" +
+            " waiting a backoff period before rejoining.";
+
+    /**
+     * <code>group.unsync.timeout.ms</code>
+     */
+    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
+    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and " +
+            " fails to catch up within worker.sync.timeout.ms, leave the Copycat cluster for this long before rejoining.";
+    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+    static {
+        CONFIG = new ConfigDef()
+                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        ConfigDef.Importance.HIGH,
+                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(SESSION_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        30000,
+                        ConfigDef.Importance.HIGH,
+                        SESSION_TIMEOUT_MS_DOC)
+                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.HIGH,
+                        HEARTBEAT_INTERVAL_MS_DOC)
+                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.SEND_BUFFER_DOC)
+                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        30000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
+                        ConfigDef.Type.INT,
+                        2,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                .define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.AUTH_TO_LOCAL, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, ConfigDef.Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                        /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
+                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_SYNC_TIMEOUT_MS_DOC)
+                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+    }
+
+    DistributedHerderConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
new file mode 100644
index 0000000..ce8fba5
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+
+/**
+ * Indicates an operation was not permitted because it can only be performed on the leader and this worker is not currently
+ * the leader.
+ */
+public class NotLeaderException extends CopycatException {
+    public NotLeaderException(String s) {
+        super(s);
+    }
+
+    public NotLeaderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotLeaderException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
new file mode 100644
index 0000000..c70ed4f
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the coordination process with the Kafka group coordinator on the broker for managing Copycat assignments to workers.
+ */
+public final class WorkerCoordinator extends AbstractCoordinator implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class);
+
+    // Currently Copycat doesn't support multiple task assignment strategies, so we currently just fill in a default value
+    public static final String DEFAULT_SUBPROTOCOL = "default";
+
+    private final KafkaConfigStorage configStorage;
+    private CopycatProtocol.Assignment assignmentSnapshot;
+    private final CopycatWorkerCoordinatorMetrics sensors;
+    private ClusterConfigState configSnapshot;
+    private final WorkerRebalanceListener listener;
+
+    private boolean rejoinRequested;
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public WorkerCoordinator(ConsumerNetworkClient client,
+                             String groupId,
+                             int sessionTimeoutMs,
+                             int heartbeatIntervalMs,
+                             Metrics metrics,
+                             String metricGrpPrefix,
+                             Map<String, String> metricTags,
+                             Time time,
+                             long requestTimeoutMs,
+                             long retryBackoffMs,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener) {
+        super(client,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                metricGrpPrefix,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs);
+        this.configStorage = configStorage;
+        this.assignmentSnapshot = null;
+        this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.listener = listener;
+        this.rejoinRequested = false;
+    }
+
+    public void requestRejoin() {
+        rejoinRequested = true;
+    }
+
+    @Override
+    public String protocolType() {
+        return "copycat";
+    }
+
+    @Override
+    public LinkedHashMap<String, ByteBuffer> metadata() {
+        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
+        configSnapshot = configStorage.snapshot();
+        CopycatProtocol.ConfigState configState = new CopycatProtocol.ConfigState(configSnapshot.offset());
+        metadata.put(DEFAULT_SUBPROTOCOL, CopycatProtocol.serializeMetadata(configState));
+        return metadata;
+    }
+
+    @Override
+    protected void onJoin(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
+        assignmentSnapshot = CopycatProtocol.deserializeAssignment(memberAssignment);
+        // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
+        // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
+        // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
+        // up to date, try to rejoin again, leaving the group and backing off, etc.).
+        rejoinRequested = false;
+        listener.onAssigned(assignmentSnapshot);
+    }
+
+    @Override
+    protected Map<String, ByteBuffer> doSync(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+        log.debug("Performing task assignment");
+
+        Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
+        for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet())
+            allConfigs.put(entry.getKey(), CopycatProtocol.deserializeMetadata(entry.getValue()));
+
+        long maxOffset = findMaxMemberConfigOffset(allConfigs);
+        Long leaderOffset = ensureLeaderConfig(maxOffset);
+        if (leaderOffset == null)
+            return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.CONFIG_MISMATCH,
+                    leaderId, maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
+        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
+    }
+
+    private long findMaxMemberConfigOffset(Map<String, CopycatProtocol.ConfigState> allConfigs) {
+        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to generate the assignment is included in
+        // the response so members that have fallen behind will not use the assignment until they have caught up.
+        Long maxOffset = null;
+        for (Map.Entry<String, CopycatProtocol.ConfigState> stateEntry : allConfigs.entrySet()) {
+            long memberRootOffset = stateEntry.getValue().offset();
+            if (maxOffset == null)
+                maxOffset = memberRootOffset;
+            else
+                maxOffset = Math.max(maxOffset, memberRootOffset);
+        }
+
+        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
+                maxOffset, configSnapshot.offset());
+        return maxOffset;
+    }
+
+    private Long ensureLeaderConfig(long maxOffset) {
+        // If this leader is behind some other members, we can't do assignment
+        if (configSnapshot.offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = configStorage.snapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have latest config found in sync request. " +
+                        "Returning an empty configuration to trigger re-sync.");
+                return null;
+            } else {
+                configSnapshot = updatedSnapshot;
+                return configSnapshot.offset();
+            }
+        }
+
+        return maxOffset;
+    }
+
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, CopycatProtocol.ConfigState> allConfigs) {
+        Map<String, List<String>> connectorAssignments = new HashMap<>();
+        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
+
+        // Perform round-robin task assignment
+        CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet()));
+        for (String connectorId : sorted(configSnapshot.connectors())) {
+            String connectorAssignedTo = memberIt.next();
+            log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
+            List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);
+            if (memberConnectors == null) {
+                memberConnectors = new ArrayList<>();
+                connectorAssignments.put(connectorAssignedTo, memberConnectors);
+            }
+            memberConnectors.add(connectorId);
+
+            for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) {
+                String taskAssignedTo = memberIt.next();
+                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
+                List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);
+                if (memberTasks == null) {
+                    memberTasks = new ArrayList<>();
+                    taskAssignments.put(taskAssignedTo, memberTasks);
+                }
+                memberTasks.add(taskId);
+            }
+        }
+
+        return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.NO_ERROR,
+                leaderId, maxOffset, connectorAssignments, taskAssignments);
+    }
+
+    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members,
+                                                                short error,
+                                                                String leaderId,
+                                                                long maxOffset,
+                                                                Map<String, List<String>> connectorAssignments,
+                                                                Map<String, List<ConnectorTaskId>> taskAssignments) {
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            List<String> connectors = connectorAssignments.get(member);
+            if (connectors == null)
+                connectors = Collections.emptyList();
+            List<ConnectorTaskId> tasks = taskAssignments.get(member);
+            if (tasks == null)
+                tasks = Collections.emptyList();
+            CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(error, leaderId, maxOffset, connectors, tasks);
+            log.debug("Assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, CopycatProtocol.serializeAssignment(assignment));
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    @Override
+    protected void onLeave(int generation, String memberId) {
+        log.debug("Revoking previous assignment {}", assignmentSnapshot);
+        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
+            listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+    }
+
+    @Override
+    public boolean needRejoin() {
+        return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public String memberId() {
+        return this.memberId;
+    }
+
+    private class CopycatWorkerCoordinatorMetrics {
+        public final Metrics metrics;
+        public final String metricGrpName;
+
+        public CopycatWorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+            this.metrics = metrics;
+            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
+
+            Measurable numConnectors = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.connectors().size();
+                }
+            };
+
+            Measurable numTasks = new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return assignmentSnapshot.tasks().size();
+                }
+            };
+
+            metrics.addMetric(new MetricName("assigned-connectors",
+                            this.metricGrpName,
+                            "The number of connector instances currently assigned to this consumer",
+                            tags),
+                    numConnectors);
+            metrics.addMetric(new MetricName("assigned-tasks",
+                            this.metricGrpName,
+                            "The number of tasks currently assigned to this consumer",
+                            tags),
+                    numTasks);
+        }
+    }
+
+    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
+        List<T> res = new ArrayList<>(members);
+        Collections.sort(res);
+        return res;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
new file mode 100644
index 0000000..f8cabaa
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class manages the coordination process with brokers for the copycat cluster group membership. It ties together
+ * the Coordinator, which implements the group member protocol, with all the other pieces needed to drive the connection
+ * to the group coordinator broker. This isolates all the networking to a single thread managed by this class, with
+ * higher level operations in response to group membership events being handled by the herder.
+ */
+public class WorkerGroupMember {
+    private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class);
+
+    private static final AtomicInteger COPYCAT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.copycat";
+
+    private final Time time;
+    private final String clientId;
+    private final ConsumerNetworkClient client;
+    private final Metrics metrics;
+    private final Metadata metadata;
+    private final long retryBackoffMs;
+    private final WorkerCoordinator coordinator;
+
+    private boolean stopped = false;
+
+    public WorkerGroupMember(DistributedHerderConfig config, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+        try {
+            this.time = new SystemTime();
+
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
+            String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+            clientId = clientIdConfig.length() <= 0 ? "copycat-" + COPYCAT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            String metricGrpPrefix = "copycat";
+            Map<String, String> metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
+            NetworkClient netClient = new NetworkClient(
+                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
+                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.coordinator = new WorkerCoordinator(this.client,
+                    config.getString(DistributedHerderConfig.GROUP_ID_CONFIG),
+                    config.getInt(DistributedHerderConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(DistributedHerderConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    retryBackoffMs,
+                    configStorage,
+                    listener);
+
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            log.debug("Copycat group member created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            stop(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    public void stop() {
+        if (stopped) return;
+        stop(false);
+    }
+
+    public void ensureActive() {
+        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureActiveGroup();
+    }
+
+    public void poll(long timeout) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Timeout must not be negative");
+
+        // poll for new data until the timeout expires
+        long remaining = timeout;
+        while (remaining >= 0) {
+            long start = time.milliseconds();
+            coordinator.ensureCoordinatorKnown();
+            coordinator.ensureActiveGroup();
+            client.poll(remaining);
+            remaining -= time.milliseconds() - start;
+        }
+    }
+
+    /**
+     * Interrupt any running poll() calls, causing a ConsumerWakeupException to be thrown in the thread invoking that method.
+     */
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    /**
+     * Get the member ID of this worker in the group of workers.
+     *
+     * This ID is the unique member ID automatically generated.
+     *
+     * @return the member ID
+     */
+    public String memberId() {
+        return coordinator.memberId();
+    }
+
+    public void requestRejoin() {
+        coordinator.requestRejoin();
+    }
+
+    private void stop(boolean swallowException) {
+        log.trace("Stopping the Copycat group member.");
+        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        this.stopped = true;
+        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        if (firstException.get() != null && !swallowException)
+            throw new KafkaException("Failed to stop the Copycat group member", firstException.get());
+        else
+            log.debug("The Copycat group member has stopped.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
new file mode 100644
index 0000000..c9d2ed2
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.Collection;
+
+/**
+ * Listener for rebalance events in the worker group.
+ */
+public interface WorkerRebalanceListener {
+    /**
+     * Invoked when a new assignment is created by joining the Copycat worker group. This is invoked for both successful
+     * and unsuccessful assignments.
+     */
+    void onAssigned(CopycatProtocol.Assignment assignment);
+
+    /**
+     * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.
+     */
+    void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index d5670fd..167ee60 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -17,20 +17,23 @@
 
 package org.apache.kafka.copycat.runtime.standalone;
 
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.util.*;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * Single process, in-memory "herder". Useful for a standalone copycat process.
@@ -56,11 +59,8 @@ public class StandaloneHerder implements Herder {
         // There's no coordination/hand-off to do here since this is all standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
         // the tasks.
-        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
-            ConnectorState state = entry.getValue();
-            stopConnector(state);
-        }
-        connectors.clear();
+        for (String connName : new HashSet<>(connectors.keySet()))
+            stopConnector(connName);
 
         log.info("Herder stopped");
     }
@@ -69,11 +69,14 @@ public class StandaloneHerder implements Herder {
     public synchronized void addConnector(Map<String, String> connectorProps,
                                           Callback<String> callback) {
         try {
-            ConnectorState connState = createConnector(connectorProps);
+            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+            worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+            connectors.put(connName, new ConnectorState(connConfig));
             if (callback != null)
-                callback.onCompletion(null, connState.name);
+                callback.onCompletion(null, connName);
             // This should always be a new job, create jobs from scratch
-            createConnectorTasks(connState);
+            createConnectorTasks(connName);
         } catch (CopycatException e) {
             if (callback != null)
                 callback.onCompletion(e, null);
@@ -81,9 +84,9 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
-    public synchronized void deleteConnector(String name, Callback<Void> callback) {
+    public synchronized void deleteConnector(String connName, Callback<Void> callback) {
         try {
-            destroyConnector(name);
+            stopConnector(connName);
             if (callback != null)
                 callback.onCompletion(null, null);
         } catch (CopycatException e) {
@@ -94,114 +97,35 @@ public class StandaloneHerder implements Herder {
 
     @Override
     public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
+        if (!worker.connectorNames().contains(connName)) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(state);
-    }
-
-    // Creates and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(Map<String, String> connectorProps) {
-        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, className);
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
-        Properties configs = connConfig.unusedProperties();
-
-        if (connectors.containsKey(connName)) {
-            log.error("Ignoring request to create connector due to conflicting connector name");
-            throw new CopycatException("Connector with name " + connName + " already exists");
-        }
-
-        final Connector connector;
-        try {
-            connector = instantiateConnector(className);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
-            // may be caused by user code
-            throw new CopycatException("Failed to create connector instance", t);
-        }
-        connector.initialize(new HerderConnectorContext(this, connName));
-        try {
-            connector.start(configs);
-        } catch (CopycatException e) {
-            throw new CopycatException("Connector threw an exception while starting", e);
-        }
-        ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
-        connectors.put(connName, state);
-
-        log.info("Finished creating connector {}", connName);
-
-        return state;
-    }
-
-    private static Connector instantiateConnector(String className) {
-        try {
-            return Utils.newInstance(className, Connector.class);
-        } catch (ClassNotFoundException e) {
-            throw new CopycatException("Couldn't instantiate connector class", e);
-        }
-    }
-
-    private void destroyConnector(String connName) {
-        log.info("Destroying connector {}", connName);
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Failed to destroy connector {} because it does not exist", connName);
-            throw new CopycatException("Connector does not exist");
-        }
-
-        stopConnector(state);
-        connectors.remove(state.name);
-
-        log.info("Finished destroying connector {}", connName);
+        updateConnectorTasks(connName);
     }
 
     // Stops a connectors tasks, then the connector
-    private void stopConnector(ConnectorState state) {
-        removeConnectorTasks(state);
+    private void stopConnector(String connName) {
+        removeConnectorTasks(connName);
         try {
-            state.connector.stop();
+            worker.stopConnector(connName);
+            connectors.remove(connName);
         } catch (CopycatException e) {
-            log.error("Error shutting down connector {}: ", state.connector, e);
+            log.error("Error shutting down connector {}: ", connName, e);
         }
     }
 
-    private void createConnectorTasks(ConnectorState state) {
-        String taskClassName = state.connector.taskClass().getName();
-
-        log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
-
-        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
-
-        // Generate the final configs, including framework provided settings
-        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskConfigs.get(i);
-            // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
-            // is automatically provided to tasks since it is required by the framework, but this
-            String subscriptionTopics = Utils.join(state.inputTopics, ",");
-            if (state.connector instanceof SinkConnector) {
-                // Make sure we don't modify the original since the connector may reuse it internally
-                Properties configForSink = new Properties();
-                configForSink.putAll(config);
-                configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
-                config = configForSink;
-            }
-            taskProps.put(taskId, config);
-        }
+    private void createConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(connName,
+                state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
+                state.config.getList(ConnectorConfig.TOPICS_CONFIG));
 
-        // And initiate the tasks
-        for (int i = 0; i < taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
-            Properties config = taskProps.get(taskId);
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskEntry : taskConfigs.entrySet()) {
+            ConnectorTaskId taskId = taskEntry.getKey();
+            TaskConfig config = new TaskConfig(taskEntry.getValue());
             try {
-                worker.addTask(taskId, taskClassName, config);
+                worker.addTask(taskId, config);
                 // We only need to store the task IDs so we can clean up.
                 state.tasks.add(taskId);
             } catch (Throwable e) {
@@ -213,7 +137,8 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void removeConnectorTasks(ConnectorState state) {
+    private void removeConnectorTasks(String connName) {
+        ConnectorState state = connectors.get(connName);
         Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
         while (taskIter.hasNext()) {
             ConnectorTaskId taskId = taskIter.next();
@@ -228,25 +153,18 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    private void updateConnectorTasks(ConnectorState state) {
-        removeConnectorTasks(state);
-        createConnectorTasks(state);
+    private void updateConnectorTasks(String connName) {
+        removeConnectorTasks(connName);
+        createConnectorTasks(connName);
     }
 
 
     private static class ConnectorState {
-        public String name;
-        public Connector connector;
-        public int maxTasks;
-        public List<String> inputTopics;
+        public ConnectorConfig config;
         Set<ConnectorTaskId> tasks;
 
-        public ConnectorState(String name, Connector connector, int maxTasks,
-                              List<String> inputTopics) {
-            this.name = name;
-            this.connector = connector;
-            this.maxTasks = maxTasks;
-            this.inputTopics = inputTopics;
+        public ConnectorState(ConnectorConfig config) {
+            this.config = config;
             this.tasks = new HashSet<>();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
index 366bf13..fb4f70d 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -226,6 +227,7 @@ public class KafkaConfigStorage {
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }
@@ -271,9 +273,14 @@ public class KafkaConfigStorage {
      * @param properties the configuration to write
      */
     public void putConnectorConfig(String connector, Map<String, String> properties) {
-        Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
-        copycatConfig.put("properties", properties);
-        byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        byte[] serializedConfig;
+        if (properties == null) {
+            serializedConfig = null;
+        } else {
+            Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+            copycatConfig.put("properties", properties);
+            serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+        }
 
         try {
             configLog.send(CONNECTOR_KEY(connector), serializedConfig);
@@ -349,6 +356,14 @@ public class KafkaConfigStorage {
         }
     }
 
+    public Future<Void> readToEnd() {
+        return configLog.readToEnd();
+    }
+
+    public void readToEnd(Callback<Void> cb) {
+        configLog.readToEnd(cb);
+    }
+
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
@@ -369,22 +384,29 @@ public class KafkaConfigStorage {
                 log.error("Failed to convert config data to Copycat format: ", e);
                 return;
             }
-            offset = record.offset();
+            // Make the recorded offset match the API used for positions in the consumer -- return the offset of the
+            // *next record*, not the last one consumed.
+            offset = record.offset() + 1;
 
             if (record.key().startsWith(CONNECTOR_PREFIX)) {
                 String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
                 synchronized (lock) {
-                    // Connector configs can be applied and callbacks invoked immediately
-                    if (!(value.value() instanceof Map)) {
-                        log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
-                        return;
-                    }
-                    Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
-                    if (!(newConnectorConfig instanceof Map)) {
-                        log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
-                        return;
+                    if (value.value() == null) {
+                        // Connector deletion will be written as a null value
+                        connectorConfigs.remove(connectorName);
+                    } else {
+                        // Connector configs can be applied and callbacks invoked immediately
+                        if (!(value.value() instanceof Map)) {
+                            log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
+                            return;
+                        }
+                        Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
+                        if (!(newConnectorConfig instanceof Map)) {
+                            log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
+                            return;
+                        }
+                        connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
                     }
-                    connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
                 }
                 if (!starting)
                     connectorConfigCallback.onCompletion(null, connectorName);
@@ -445,8 +467,7 @@ public class KafkaConfigStorage {
 
                     Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
 
-                    Object newTaskCountObj = ((Map<String, Object>) value.value()).get("tasks");
-                    Integer newTaskCount = (Integer) newTaskCountObj;
+                    int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
 
                     // Validate the configs we're supposed to update to ensure we're getting a complete configuration
                     // update of all tasks that are expected based on the number of tasks in the commit message.
@@ -542,5 +563,16 @@ public class KafkaConfigStorage {
                 return false;
         return true;
     }
+
+    // Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
+    // encodings by different Converters.
+    private static int intValue(Object value) {
+        if (value instanceof Integer)
+            return (int) value;
+        else if (value instanceof Long)
+            return (int) (long) value;
+        else
+            throw new CopycatException("Expected integer value to be either Integer or Long");
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index b5af1fe..b270368 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -68,11 +68,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         producerProps.putAll(configs);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
index 683c634..e3e498c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
  * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
  * the connector.
  */
-public class ConnectorTaskId implements Serializable {
+public class ConnectorTaskId implements Serializable, Comparable<ConnectorTaskId> {
     private final String connector;
     private final int task;
 
@@ -68,4 +68,12 @@ public class ConnectorTaskId implements Serializable {
     public String toString() {
         return connector + '-' + task;
     }
+
+    @Override
+    public int compareTo(ConnectorTaskId o) {
+        int connectorCmp = connector.compareTo(o.connector);
+        if (connectorCmp != 0)
+            return connectorCmp;
+        return ((Integer) task).compareTo(o.task);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 4a30992..19e1462 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -19,11 +19,19 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetBackingStore;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.apache.kafka.copycat.util.MockTime;
 import org.apache.kafka.copycat.util.ThreadedTest;
@@ -36,16 +44,24 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(Worker.class)
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private static final String CONNECTOR_ID = "test-connector";
+    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+
     private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
@@ -65,6 +81,146 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
+    public void testAddRemoveConnector() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testStopInvalidConnector() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        worker.stopConnector(CONNECTOR_ID);
+    }
+
+    @Test
+    public void testReconfigureConnectorTasks() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+
+        Properties props = new Properties();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Reconfigure
+        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
+        Properties taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+        EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+        try {
+            worker.addConnector(config, ctx);
+            fail("Should have thrown exception when trying to add connector with same name.");
+        } catch (CopycatException e) {
+            // expected
+        }
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = worker.reconfigureConnectorTasks(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
+        Properties expectedTaskProps = new Properties();
+        expectedTaskProps.setProperty("foo", "bar");
+        expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+        expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
+        assertEquals(2, taskConfigs.size());
+        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 0)));
+        assertEquals(expectedTaskProps, taskConfigs.get(new ConnectorTaskId(CONNECTOR_ID, 1)));
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+
+    @Test
     public void testAddRemoveTask() throws Exception {
         offsetBackingStore.configure(EasyMock.anyObject(Map.class));
         EasyMock.expectLastCall();
@@ -78,7 +234,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@@ -91,6 +247,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -108,8 +265,11 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        worker.addTask(taskId, new TaskConfig(Utils.propsToStringMap(origProps)));
+        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
         worker.stopTask(taskId);
+        assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
 
@@ -128,7 +288,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        worker.stopTask(taskId);
+        worker.stopTask(TASK_ID);
     }
 
     @Test
@@ -143,10 +303,10 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
 
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -156,6 +316,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
         Properties origProps = new Properties();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
 
@@ -174,13 +335,35 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+        worker.addTask(TASK_ID, new TaskConfig(Utils.propsToStringMap(origProps)));
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
 
+    private static class TestConnector extends Connector {
+        @Override
+        public void start(Properties props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Properties> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+    }
+
     private static class TestSourceTask extends SourceTask {
         public TestSourceTask() {
         }


Mime
View raw message