kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3674: Ensure connector target state changes propagated to worker
Date Mon, 09 May 2016 07:12:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fc89083f8 -> 8911660e2


KAFKA-3674: Ensure connector target state changes propagated to worker

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1341 from hachikuji/KAFKA-3674


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8911660e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8911660e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8911660e

Branch: refs/heads/trunk
Commit: 8911660e2e7d9553502974393ad1aa04852c2da2
Parents: fc89083
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon May 9 00:12:30 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon May 9 00:12:30 2016 -0700

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  |  22 +-
 .../storage/KafkaConfigBackingStore.java        |  30 ++-
 .../connect/runtime/WorkerSourceTaskTest.java   |   4 +-
 .../distributed/DistributedHerderTest.java      | 204 +++++++++++++++
 .../storage/KafkaConfigBackingStoreTest.java    | 258 ++++++++++++++++++-
 5 files changed, 498 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a2beff3..afabbeb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -309,15 +309,21 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     }
 
     private void processTargetStateChanges(Set<String> connectorTargetStateChanges)
{
-        if (!connectorTargetStateChanges.isEmpty()) {
-            for (String connector : connectorTargetStateChanges) {
-                if (worker.connectorNames().contains(connector)) {
-                    TargetState targetState = configState.targetState(connector);
-                    worker.setTargetState(connector, targetState);
-                    if (targetState == TargetState.STARTED)
-                        reconfigureConnectorTasksWithRetry(connector);
-                }
+        for (String connector : connectorTargetStateChanges) {
+            TargetState targetState = configState.targetState(connector);
+            if (!configState.connectors().contains(connector)) {
+                log.debug("Received target state change for unknown connector: {}", connector);
+                continue;
             }
+
+            // we must propagate the state change to the worker so that the connector's
+            // tasks can transition to the new target state
+            worker.setTargetState(connector, targetState);
+
+            // additionally, if the worker is running the connector itself, then we need
to
+            // request reconfiguration to ensure that config changes while paused take effect
+            if (worker.ownsConnector(connector) && targetState == TargetState.STARTED)
+                reconfigureConnectorTasksWithRetry(connector);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9412e42..a894f31 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -317,8 +317,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     @Override
     public void removeConnectorConfig(String connector) {
         log.debug("Removing connector configuration for connector {}", connector);
-        updateConnectorConfig(connector, null);
-        configLog.send(TARGET_STATE_KEY(connector), null);
+        try {
+            configLog.send(CONNECTOR_KEY(connector), null);
+            configLog.send(TARGET_STATE_KEY(connector), null);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to remove connector configuration from Kafka: ", e);
+            throw new ConnectException("Error removing connector configuration from Kafka",
e);
+        }
     }
 
     @Override
@@ -437,8 +443,19 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
             if (record.key().startsWith(TARGET_STATE_PREFIX)) {
                 String connectorName = record.key().substring(TARGET_STATE_PREFIX.length());
+                boolean removed = false;
                 synchronized (lock) {
-                    if (value.value() != null) {
+                    if (value.value() == null) {
+                        // When connector configs are removed, we also write tombstones for
the target state.
+                        log.debug("Removed target state for connector {} due to null value
in topic.", connectorName);
+                        connectorTargetStates.remove(connectorName);
+                        removed = true;
+
+                        // If for some reason we still have configs for the connector, add
back the default
+                        // STARTED state to ensure each connector always has a valid target
state.
+                        if (connectorConfigs.containsKey(connectorName))
+                            connectorTargetStates.put(connectorName, TargetState.STARTED);
+                    } else {
                         if (!(value.value() instanceof Map)) {
                             log.error("Found target state ({}) in wrong format: {}",  record.key(),
value.value().getClass());
                             return;
@@ -461,8 +478,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                     }
                 }
 
-                if (!starting)
+                // Note that we do not notify the update listener if the target state has
been removed.
+                // Instead we depend on the removal callback of the connector config itself
to notify the worker.
+                if (!starting && !removed)
                     updateListener.onConnectorTargetStateChange(connectorName);
+
             } else if (record.key().startsWith(CONNECTOR_PREFIX)) {
                 String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
                 boolean removed = false;
@@ -487,6 +507,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         log.debug("Updating configuration for connector " + connectorName
+ " configuration: " + newConnectorConfig);
                         connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
 
+                        // Set the initial state of the connector to STARTED, which ensures
that any connectors
+                        // which were created with 0.9 Connect will be initialized in the
STARTED state.
                         if (!connectorTargetStates.containsKey(connectorName))
                             connectorTargetStates.put(connectorName, TargetState.STARTED);
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0d805da..0768781 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -203,7 +203,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         int priorCount = count.get();
         Thread.sleep(100);
-        assertEquals(priorCount, count.get());
+
+        // since the transition is observed asynchronously, the count could be off by one
loop iteration
+        assertTrue(count.get() - priorCount <= 1);
 
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index fbccc55..81e6be8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -130,6 +130,9 @@ public class DistributedHerderTest {
     private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1,
3),
             Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1,
TargetState.STARTED),
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
+    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1,
Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1,
TargetState.PAUSED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1,
Collections.singletonMap(CONN1, 3),
             Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1,
TargetState.STARTED),
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
@@ -747,6 +750,207 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testConnectorPaused() throws Exception {
+        // ensure that target state changes are propagated to the worker
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
+
+        worker.setTargetState(CONN1, TargetState.PAUSED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused
+        herder.tick(); // worker should apply the state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorResumed() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // start with the connector paused
+        expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        // we expect reconfiguration after resuming
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+
+        worker.setTargetState(CONN1, TargetState.STARTED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to started
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testUnknownConnectorPaused() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder),
EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // state change is ignored since we have no target state
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange("unknown-connector");
+        herder.tick(); // continue
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorPausedRunningTaskOnly() throws Exception {
+        // even if we don't own the connector, we should still propagate target state
+        // changes to the worker so that tasks will transition correctly
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder),
EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+
+        worker.setTargetState(CONN1, TargetState.PAUSED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorResumedRunningTaskOnly() throws Exception {
+        // even if we don't own the connector, we should still propagate target state
+        // changes to the worker so that tasks will transition correctly
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder),
EasyMock.eq(TargetState.PAUSED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+
+        worker.setTargetState(CONN1, TargetState.STARTED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testTaskConfigAdded() {
         // Task config always requires rebalance
         EasyMock.expect(member.memberId()).andStubReturn("member");

http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 617177e..f5bce8f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -53,9 +54,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaConfigBackingStore.class)
@@ -107,6 +110,7 @@ public class KafkaConfigBackingStoreTest {
             new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
             new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
     );
+    private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state",
"PAUSED");
 
     private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
             = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks",
2);
@@ -181,15 +185,10 @@ public class KafkaConfigBackingStoreTest {
         EasyMock.expectLastCall();
 
         // Config deletion
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
null, null, null);
+        expectConnectorRemoval(CONNECTOR_CONFIG_KEYS.get(1), TARGET_STATE_KEYS.get(1));
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
-        // Target state deletion
-        storeLog.send(TARGET_STATE_KEYS.get(1), null);
-        PowerMock.expectLastCall();
-
         expectStop();
 
         PowerMock.replayAll();
@@ -220,9 +219,10 @@ public class KafkaConfigBackingStoreTest {
         // Deletion should remove the second one we added
         configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
         configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
+        assertEquals(4, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+        assertNull(configState.targetState(CONNECTOR_IDS.get(1)));
 
         configStorage.stop();
 
@@ -346,6 +346,176 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testRestoreTargetState() throws Exception {
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be next to be read, even
if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testBackgroundUpdateTargetState() throws Exception {
+        // verify that we handle target state changes correctly when they come up through
the log
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_PAUSED);
+
+        configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testBackgroundConnectorDeletion() throws Exception {
+        // verify that we handle connector deletions correctly when they come up through
the log
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+        serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1));
+
+        Map<String, Struct> deserializedData = new HashMap<>();
+        deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
+        deserializedData.put(TARGET_STATE_KEYS.get(0), null);
+
+        expectRead(serializedData, deserializedData);
+
+        configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // The target state deletion should reset the state to STARTED
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be next to be read, even
if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRestore() throws Exception {
         // Restoring data should notify only of the latest values after loading is complete.
This also validates
         // that inconsistent state is ignored.
@@ -385,6 +555,7 @@ public class KafkaConfigBackingStoreTest {
         ClusterConfigState configState = configStorage.snapshot();
         assertEquals(7, configState.offset()); // Should always be next to be read, even
if uncommitted
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
         // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
         assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
         // Should see 2 tasks for that connector. Only config updates before the root key
update should be reflected
@@ -400,6 +571,51 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testRestoreConnectorDeletion() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete.
This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere
in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be read, even
if uncommitted
+        assertTrue(configState.connectors().isEmpty());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRestoreZeroTasks() throws Exception {
         // Restoring data should notify only of the latest values after loading is complete.
This also validates
         // that inconsistent state is ignored.
@@ -558,6 +774,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.expectLastCall();
     }
 
+    private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
+                            Map<String, Struct> deserializedValues) {
+        expectReadToEnd(serializedValues);
+        for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet())
{
+            byte[] serializedValue = serializedValues.get(deserializedValueEntry.getKey());
+            EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serializedValue)))
+                    .andReturn(new SchemaAndValue(null, structToMap(deserializedValueEntry.getValue())));
+        }
+    }
+
+    private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue)
{
+        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+        serializedData.put(key, serializedValue);
+        expectRead(serializedData, Collections.singletonMap(key, deserializedValue));
+    }
+
     // Expect a conversion & write to the underlying log, followed by a subsequent read
when the data is consumed back
     // from the log. Validate the data that is captured when the conversion is performed
matches the specified data
     // (by checking a single field's value)
@@ -596,6 +828,15 @@ public class KafkaConfigBackingStoreTest {
                 });
     }
 
+    private void expectConnectorRemoval(String configKey, String targetStateKey) {
+        expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
null, null, null);
+        expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null,
null, null);
+
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, null);
+        recordsToRead.put(targetStateKey, null);
+        expectReadToEnd(recordsToRead);
+    }
 
     private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema,
final byte[] serialized,
                                            final String dataFieldName, final Object dataFieldValue)
{
@@ -619,6 +860,9 @@ public class KafkaConfigBackingStoreTest {
 
     // Generates a Map representation of Struct. Only does shallow traversal, so nested structs
are not converted
     private Map<String, Object> structToMap(Struct struct) {
+        if (struct == null)
+            return null;
+
         HashMap<String, Object> result = new HashMap<>();
         for (Field field : struct.schema().fields())
             result.put(field.name(), struct.get(field));


Mime
View raw message