kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [15/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
Date Mon, 09 Nov 2015 06:11:34 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
new file mode 100644
index 0000000..38c1aeb
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class WorkerCoordinatorTest {
+
+    private static final String LEADER_URL = "leaderUrl:8083";
+    private static final String MEMBER_URL = "memberUrl:8083";
+
+    private String connectorId = "connector";
+    private String connectorId2 = "connector2";
+    private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1);
+    private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0);
+
+    private String groupId = "test-group";
+    private int sessionTimeoutMs = 10;
+    private int heartbeatIntervalMs = 2;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 5000;
+    private MockTime time;
+    private MockClient client;
+    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
+    private Node node = cluster.nodes().get(0);
+    private Metadata metadata;
+    private Metrics metrics;
+    private Map<String, String> metricTags = new LinkedHashMap<>();
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceListener rebalanceListener;
+    @Mock private KafkaConfigStorage configStorage;
+    private WorkerCoordinator coordinator;
+
+    private ClusterConfigState configState1;
+    private ClusterConfigState configState2;
+
+    @Before
+    public void setup() {
+        this.time = new MockTime();
+        this.client = new MockClient(time);
+        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata.update(cluster, time.milliseconds());
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.metrics = new Metrics(time);
+        this.rebalanceListener = new MockRebalanceListener();
+        this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
+
+        client.setNode(node);
+
+        this.coordinator = new WorkerCoordinator(consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs,
+                LEADER_URL,
+                configStorage,
+                rebalanceListener);
+
+        configState1 = new ClusterConfigState(
+                1L, Collections.singletonMap(connectorId, 1),
+                Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()),
+                Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()),
+                Collections.<String>emptySet()
+        );
+        Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>();
+        configState2ConnectorTaskCounts.put(connectorId, 2);
+        configState2ConnectorTaskCounts.put(connectorId2, 1);
+        Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>();
+        configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>());
+        configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>());
+        Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>();
+        configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId2, new HashMap<String, String>());
+        configState2 = new ClusterConfigState(
+                2L, configState2ConnectorTaskCounts,
+                configState2ConnectorConfigs,
+                configState2TaskConfigs,
+                Collections.<String>emptySet()
+        );
+    }
+
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
+    // We only test functionality unique to WorkerCoordinator. Most functionality is already well tested via the tests
+    // that cover AbstractCoordinator & ConsumerCoordinator.
+
+    @Test
+    public void testMetadata() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+        assertEquals(1, serialized.size());
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+        assertEquals(1, state.offset());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupLeader() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String consumerId = "leader";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        Map<String, Long> memberConfigOffsets = new HashMap<>();
+        memberConfigOffsets.put("leader", 1L);
+        memberConfigOffsets.put("member", 1L);
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals("leader", rebalanceListener.assignment.leader());
+        assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupFollower() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCannotAssign() {
+        // If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately
+        // need to retry the join.
+
+        // When the first round fails, we'll take an updated config snapshot
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // config mismatch results in assignment error
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        };
+        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
+                Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
+        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
+                Collections.<String>emptyList(), Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRejoinGroup() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join the group once
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks());
+
+        // and join the group again
+        coordinator.requestRejoin();
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors);
+        assertEquals(Collections.singletonList(taskId0), rebalanceListener.revokedTasks);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderPerformAssignment1() throws Exception {
+        // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
+        configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState1 has 1 connector, 1 task
+        ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors());
+        assertEquals(Collections.emptyList(), leaderAssignment.tasks());
+
+        ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.emptyList(), memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderPerformAssignment2() throws Exception {
+        // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
+        configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
+        ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1, taskId2), leaderAssignment.tasks());
+
+        ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+
+    private Struct groupMetadataResponse(Node node, short error) {
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
+        return response.toStruct();
+    }
+
+    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+                                           Map<String, Long> configOffsets, short error) {
+        Map<String, ByteBuffer> metadata = new HashMap<>();
+        for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
+            // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
+            String memberUrl = configStateEntry.getKey();
+            long configOffset = configStateEntry.getValue();
+            ByteBuffer buf = ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(memberUrl, configOffset));
+            metadata.put(configStateEntry.getKey(), buf);
+        }
+        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
+    }
+
+    private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
+                                     List<ConnectorTaskId> taskIds, short error) {
+        ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
+        ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
+        return new SyncGroupResponse(error, buf).toStruct();
+    }
+
+
+    private static class MockRebalanceListener implements WorkerRebalanceListener {
+        public ConnectProtocol.Assignment assignment = null;
+
+        public String revokedLeader;
+        public Collection<String> revokedConnectors;
+        public Collection<ConnectorTaskId> revokedTasks;
+
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+        @Override
+        public void onAssigned(ConnectProtocol.Assignment assignment) {
+            this.assignment = assignment;
+            assignedCount++;
+        }
+
+        @Override
+        public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+            this.revokedLeader = leader;
+            this.revokedConnectors = connectors;
+            this.revokedTasks = tasks;
+            revokedCount++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
new file mode 100644
index 0000000..1feab0d
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorsResourceTest {
+    // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
+    // URL construction properly, avoiding //, which will mess up routing in the REST server
+    private static final String LEADER_URL = "http://leader:8083/";
+    private static final String CONNECTOR_NAME = "test";
+    private static final String CONNECTOR2_NAME = "test2";
+    private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
+        CONNECTOR_CONFIG.put("sample_config", "test_config");
+    }
+    private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList(
+            new ConnectorTaskId(CONNECTOR_NAME, 0),
+            new ConnectorTaskId(CONNECTOR_NAME, 1)
+    );
+    private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
+    static {
+        TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
+        TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
+    }
+    private static final List<TaskInfo> TASK_INFOS = new ArrayList<>();
+    static {
+        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), TASK_CONFIGS.get(0)));
+        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
+    }
+
+
+    @Mock
+    private Herder herder;
+    private ConnectorsResource connectorsResource;
+
+    @Before
+    public void setUp() throws NoSuchMethodException {
+        PowerMock.mockStatic(RestServer.class,
+                RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+        connectorsResource = new ConnectorsResource(herder);
+    }
+
+    @Test
+    public void testListConnectors() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME));
+
+        PowerMock.replayAll();
+
+        Collection<String> connectors = connectorsResource.listConnectors();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testListConnectorsNotLeader() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"),
+                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
+                .andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
+
+        PowerMock.replayAll();
+
+        Collection<String> connectors = connectorsResource.listConnectors();
+        // Ordering isn't guaranteed, compare sets
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testListConnectorsNotSynced() throws Throwable {
+        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
+        herder.connectors(EasyMock.capture(cb));
+        expectAndCallbackException(cb, new ConnectException("not synced"));
+
+        PowerMock.replayAll();
+
+        // throws
+        connectorsResource.listConnectors();
+    }
+
+    @Test
+    public void testCreateConnector() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorNotLeader() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
+                .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = AlreadyExistsException.class)
+    public void testCreateConnectorExists() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new AlreadyExistsException("already exists"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(body);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDeleteConnector() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, null);
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDeleteConnectorNotLeader() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackNotLeaderException(cb);
+        // Should forward request
+        EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null))
+                .andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null));
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    // Not found exceptions should pass through to caller so they can be processed for 404s
+    @Test(expected = NotFoundException.class)
+    public void testDeleteConnectorNotFound() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.destroyConnector(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnector() throws Throwable {
+        final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance();
+        herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES));
+
+        PowerMock.replayAll();
+
+        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME);
+        assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnectorConfig() throws Throwable {
+        final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, CONNECTOR_CONFIG);
+
+        PowerMock.replayAll();
+
+        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+        assertEquals(CONNECTOR_CONFIG, connConfig);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testGetConnectorConfigConnectorNotFound() throws Throwable {
+        final Capture<Callback<Map<String, String>>> cb = Capture.newInstance();
+        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testGetConnectorTaskConfigs() throws Throwable {
+        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, TASK_INFOS);
+
+        PowerMock.replayAll();
+
+        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+        assertEquals(TASK_INFOS, taskInfos);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testGetConnectorTaskConfigsConnectorNotFound() throws Throwable {
+        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
+        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("connector not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorTaskConfigs() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, null);
+
+        PowerMock.replayAll();
+
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = NotFoundException.class)
+    public void testPutConnectorTaskConfigsConnectorNotFound() throws Throwable {
+        final Capture<Callback<Void>> cb = Capture.newInstance();
+        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
+        expectAndCallbackException(cb, new NotFoundException("not found"));
+
+        PowerMock.replayAll();
+
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
+
+        PowerMock.verifyAll();
+    }
+
+    private  <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) {
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                cb.getValue().onCompletion(null, value);
+                return null;
+            }
+        });
+    }
+
+    private  <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) {
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                cb.getValue().onCompletion(t, null);
+                return null;
+            }
+        });
+    }
+
+    private  <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) {
+        expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
new file mode 100644
index 0000000..8ef1d24
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.standalone;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+public class StandaloneHerderTest {
+    private static final String CONNECTOR_NAME = "test";
+    private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
+    private static final String TOPICS_LIST_STR = "topic1,topic2";
+    private static final int DEFAULT_MAX_TASKS = 1;
+
+    private StandaloneHerder herder;
+    @Mock protected Worker worker;
+    private Connector connector;
+    @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
+
+    @Before
+    public void setup() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = new StandaloneHerder(worker);
+    }
+
+    @Test
+    public void testCreateSourceConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorAlreadyExists() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        // First addition should succeed
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
+        // Second should fail
+        createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateSinkConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSinkConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, BogusSinkTask.class, true);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDestroyConnector() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectDestroy();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
+        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
+        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+
+        // Second deletion should fail since the connector is gone
+        futureCb = new FutureCallback<>();
+        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
+        try {
+            futureCb.get(1000L, TimeUnit.MILLISECONDS);
+            fail("Should have thrown NotFoundException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NotFoundException);
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateAndStop() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
+        expectStop();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
+        herder.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAccessors() throws Exception {
+        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+
+        Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
+        Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
+        Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class);
+        Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class);
+
+        // Check accessors with empty worker
+        listConnectorsCb.onCompletion(null, Collections.EMPTY_LIST);
+        EasyMock.expectLastCall();
+        connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<ConnectorInfo>isNull());
+        EasyMock.expectLastCall();
+        connectorConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<Map<String, String>>isNull());
+        EasyMock.expectLastCall();
+        taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<List<TaskInfo>>isNull());
+        EasyMock.expectLastCall();
+
+
+        // Create connector
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
+        // Validate accessors with 1 connector
+        listConnectorsCb.onCompletion(null, Arrays.asList(CONNECTOR_NAME));
+        EasyMock.expectLastCall();
+        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
+        connectorInfoCb.onCompletion(null, connInfo);
+        EasyMock.expectLastCall();
+        connectorConfigCb.onCompletion(null, connConfig);
+        EasyMock.expectLastCall();
+
+        TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(BogusSourceTask.class, false));
+        taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
+        EasyMock.expectLastCall();
+
+
+        PowerMock.replayAll();
+
+        // All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call
+        herder.connectors(listConnectorsCb);
+        herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
+        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
+        herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
+        herder.connectors(listConnectorsCb);
+        herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
+        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
+        herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class);
+        Map<String, String> newConnConfig = new HashMap<>(connConfig);
+        newConnConfig.put("foo", "bar");
+
+        Callback<Map<String, String>> connectorConfigCb = PowerMock.createMock(Callback.class);
+        Callback<Herder.Created<ConnectorInfo>> putConnectorConfigCb = PowerMock.createMock(Callback.class);
+
+        // Create
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        // Should get first config
+        connectorConfigCb.onCompletion(null, connConfig);
+        EasyMock.expectLastCall();
+        // Update config, which requires stopping and restarting
+        worker.stopConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+        Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
+        worker.addConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject());
+        EasyMock.expectLastCall();
+        // Generate same task config, which should result in no additional action to restart tasks
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
+                .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
+        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
+        putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
+        EasyMock.expectLastCall();
+        // Should get new config
+        connectorConfigCb.onCompletion(null, newConnConfig);
+        EasyMock.expectLastCall();
+
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
+        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
+        herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, putConnectorConfigCb);
+        assertEquals("bar", capturedConfig.getValue().originals().get("foo"));
+        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
+
+        PowerMock.verifyAll();
+
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testPutTaskConfigs() {
+        Callback<Void> cb = PowerMock.createMock(Callback.class);
+
+        PowerMock.replayAll();
+
+        herder.putTaskConfigs(CONNECTOR_NAME,
+                Arrays.asList(Collections.singletonMap("config", "value")),
+                cb);
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectAdd(String name, Class<? extends Connector> connClass, Class<? extends Task> taskClass,
+                           boolean sink) throws Exception {
+        Map<String, String> connectorProps = connectorConfig(name, connClass);
+
+        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class));
+        PowerMock.expectLastCall();
+
+        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
+        createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
+        PowerMock.expectLastCall();
+
+        // And we should instantiate the tasks. For a sink task, we should see added properties for
+        // the input topic partitions
+        Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
+                .andReturn(Collections.singletonList(generatedTaskProps));
+
+        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps));
+        PowerMock.expectLastCall();
+    }
+
+    private void expectStop() {
+        worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+        EasyMock.expectLastCall();
+        worker.stopConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+    }
+
+    private void expectDestroy() {
+        expectStop();
+    }
+
+
+    private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) {
+        HashMap<String, String> connectorProps = new HashMap<>();
+        connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
+        connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+        return connectorProps;
+    }
+
+    private static Map<String, String> taskConfig(Class<? extends Task> taskClass, boolean sink) {
+        HashMap<String, String> generatedTaskProps = new HashMap<>();
+        // Connectors can add any settings, so these are arbitrary
+        generatedTaskProps.put("foo", "bar");
+        generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName());
+        if (sink)
+            generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        return generatedTaskProps;
+    }
+
+    // We need to use a real class here due to some issue with mocking java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
+
+    private abstract class BogusSinkConnector extends SinkConnector {
+    }
+
+    private abstract class BogusSinkTask extends SourceTask {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
new file mode 100644
index 0000000..3833e88
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileOffsetBackingStoreTest {
+
+    FileOffsetBackingStore store;
+    Map<String, Object> props;
+    File tempFile;
+
+    private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
+
+    static {
+        firstSet.put(buffer("key"), buffer("value"));
+        firstSet.put(null, null);
+    }
+
+    @Before
+    public void setup() throws IOException {
+        store = new FileOffsetBackingStore();
+        tempFile = File.createTempFile("fileoffsetbackingstore", null);
+        props = new HashMap<>();
+        props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
+        store.configure(props);
+        store.start();
+    }
+
+    @After
+    public void teardown() {
+        tempFile.delete();
+    }
+
+    @Test
+    public void testGetSet() throws Exception {
+        Callback<Void> setCallback = expectSuccessfulSetCallback();
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
+        PowerMock.replayAll();
+
+        store.set(firstSet, setCallback).get();
+
+        Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+        assertEquals(buffer("value"), values.get(buffer("key")));
+        assertEquals(null, values.get(buffer("bad")));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSaveRestore() throws Exception {
+        Callback<Void> setCallback = expectSuccessfulSetCallback();
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
+        PowerMock.replayAll();
+
+        store.set(firstSet, setCallback).get();
+        store.stop();
+
+        // Restore into a new store to ensure correct reload from scratch
+        FileOffsetBackingStore restore = new FileOffsetBackingStore();
+        restore.configure(props);
+        restore.start();
+        Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
+        assertEquals(buffer("value"), values.get(buffer("key")));
+
+        PowerMock.verifyAll();
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+    private Callback<Void> expectSuccessfulSetCallback() {
+        @SuppressWarnings("unchecked")
+        Callback<Void> setCallback = PowerMock.createMock(Callback.class);
+        setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
+        PowerMock.expectLastCall();
+        return setCallback;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
+        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
+        getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
+        PowerMock.expectLastCall();
+        return getCallback;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
new file mode 100644
index 0000000..e007f02
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConfigStorage.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaConfigStorageTest {
+    private static final String TOPIC = "connect-configs";
+    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+
+    static {
+        DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+    }
+
+    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+    private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
+    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
+
+    // Need a) connector with multiple tasks and b) multiple connectors
+    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
+            new ConnectorTaskId("connector1", 0),
+            new ConnectorTaskId("connector1", 1),
+            new ConnectorTaskId("connector2", 0)
+    );
+    private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
+
+    // Need some placeholders -- the contents don't matter here, just that they are restored properly
+    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+            Collections.singletonMap("config-key-one", "config-value-one"),
+            Collections.singletonMap("config-key-two", "config-value-two"),
+            Collections.singletonMap("config-key-three", "config-value-three")
+    );
+    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
+    );
+    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
+    );
+
+    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
+            = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
+
+    // The exact format doesn't matter here since both conversions are mocked
+    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
+            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
+            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
+            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
+    );
+
+    @Mock
+    private Converter converter;
+    @Mock
+    private Callback<String> connectorReconfiguredCallback;
+    @Mock
+    private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
+    @Mock
+    KafkaBasedLog<String, byte[]> storeLog;
+    private KafkaConfigStorage configStorage;
+
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+    private long logOffset = 0;
+
+    @Before
+    public void setUp() {
+        configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"},
+                converter, connectorReconfiguredCallback, tasksReconfiguredCallback);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        configStorage.start();
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        // Config deletion
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configState = configStorage.snapshot();
+        assertEquals(1, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Second should also block and all configs should still be available
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Deletion should remove the second one we added
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null);
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigs() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        // Task configs should read to end, write to the log, read to end, write root, then read to end again
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 2); // Starts with 0 tasks, after update has 2
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+        // Writing task task configs should block until all the writes have been performed and the root record update
+        // has completed
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configStorage.putTaskConfigs(taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                // Connector after root update should make it through, task update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 7;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
+        // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
+        // We start out by loading an initial configuration where we started to write a task update and failed before
+        // writing an the commit, and then compaction cleaned up the earlier record.
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                // This is the record that has been compacted:
+                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // One failed attempt to write new task configs
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+
+        // Successful attempt to write new task config
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 1); // Updated to just 1 task
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0)));
+        EasyMock.expectLastCall();
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+        // After reading the log, it should have been in an inconsistent state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
+        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
+
+        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
+        try {
+            configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
+            fail("Should have failed due to incomplete task set.");
+        } catch (KafkaException e) {
+            // expected
+        }
+
+        // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
+        // we are going to shrink the number of tasks to 1
+        configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        // Validate updated config
+        configState = configStorage.snapshot();
+        // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
+        // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
+        assertEquals(8, configState.offset());
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
+                EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
+
+    // If non-empty, deserializations should be a LinkedHashMap
+    private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
+                             final Map<byte[], Struct> deserializations) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
+        for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
+            // Note null schema because default settings for internal serialization are schema-less
+            EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
+                    .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
+        }
+    }
+
+    private void expectStop() {
+        storeLog.stop();
+        PowerMock.expectLastCall();
+    }
+
+    // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
+    // from the log. Validate the data that is captured when the conversion is performed matches the specified data
+    // (by checking a single field's value)
+    private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                        final String dataFieldName, final Object dataFieldValue) {
+        final Capture<Struct> capturedRecord = EasyMock.newCapture();
+        if (serialized != null)
+            EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                    .andReturn(serialized);
+        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+        PowerMock.expectLastCall();
+        EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
+                .andAnswer(new IAnswer<SchemaAndValue>() {
+                    @Override
+                    public SchemaAndValue answer() throws Throwable {
+                        if (dataFieldName != null)
+                            assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+                        // Note null schema because default settings for internal serialization are schema-less
+                        return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
+                    }
+                });
+    }
+
+    // This map needs to maintain ordering
+    private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
+        EasyMock.expect(storeLog.readToEnd())
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        TestFuture<Void> future = new TestFuture<Void>();
+                        for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
+                        future.resolveOnGet((Void) null);
+                        return future;
+                    }
+                });
+    }
+
+
+    private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                                                 final String dataFieldName, final Object dataFieldValue) {
+        expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, serialized);
+        expectReadToEnd(recordsToRead);
+    }
+
+    // Manually insert a connector into config storage, updating the task configs, connector config, and root config
+    private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
+        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
+        for (int i = 0; i < taskConfigs.size(); i++)
+            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
+
+        Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
+        connectorConfigs.put(connectorName, connectorConfig);
+
+        Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
+    }
+
+    // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
+    private Map<String, Object> structToMap(Struct struct) {
+        HashMap<String, Object> result = new HashMap<>();
+        for (Field field : struct.schema().fields())
+            result.put(field.name(), struct.get(field));
+        return result;
+    }
+
+}


Mime
View raw message