kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] 02/02: MINOR: Add unit test for KAFKA-8676 to guard against unrequired task restarts (#7287)
Date Tue, 03 Sep 2019 21:31:08 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b9450a49ec0b7e10eca1e4413bdd6956f1a18677
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Tue Sep 3 13:48:19 2019 -0700

    MINOR: Add unit test for KAFKA-8676 to guard against unrequired task restarts (#7287)
    
    Added unit test for recent fix of `KafkaConfigBackingStore`.
    
    Author: Konstantine Karantasis <konstantine@confluent.io>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../storage/KafkaConfigBackingStoreTest.java       | 90 ++++++++++++++++++++++
 1 file changed, 90 insertions(+)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 68c447a..8e358e0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -303,6 +303,96 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        // Task configs should read to end, write to the log, read to end, write root, then
read to end again
+        expectReadToEnd(new LinkedHashMap<>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
CONFIGS_SERIALIZED.get(2),
+                "tasks", 2); // Starts with 0 tasks, after update has 2
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured
some tasks
+        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+        // Task configs should read to end, write to the log, read to end, write root, then
read to end again
+        expectReadToEnd(new LinkedHashMap<>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(2), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3),
+                "properties", SAMPLE_CONFIGS.get(2));
+        expectReadToEnd(new LinkedHashMap<>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
CONFIGS_SERIALIZED.get(4),
+                "tasks", 1); // Starts with 2 tasks, after update has 3
+
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured
some tasks
+        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(2)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(2), CONFIGS_SERIALIZED.get(3));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added
yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList());
+        whiteboxAddConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), Collections.emptyList());
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+        // Writing task configs should block until all the writes have been performed and
the root record update
+        // has completed
+        List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0),
SAMPLE_CONFIGS.get(1));
+        configStorage.putTaskConfigs("connector1", taskConfigs);
+        taskConfigs = Collections.singletonList(SAMPLE_CONFIGS.get(2));
+        configStorage.putTaskConfigs("connector2", taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(5, configState.offset());
+        String connectorName1 = CONNECTOR_IDS.get(0);
+        String connectorName2 = CONNECTOR_IDS.get(1);
+        assertEquals(Arrays.asList(connectorName1, connectorName2), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName1));
+        assertEquals(Collections.singletonList(TASK_IDS.get(2)), configState.tasks(connectorName2));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.taskConfig(TASK_IDS.get(2)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPutTaskConfigsZeroTasks() throws Exception {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());


Mime
View raw message