kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
Date Sun, 07 Jun 2020 20:01:39 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 484ff65  KAFKA-9216: Enforce internal config topic settings for Connect workers during
startup (#8270)
484ff65 is described below

commit 484ff65cd31587ff3f1bf36e178985c91bed9d08
Author: Evelyn Bayes <30654260+Evelyn-Bayes@users.noreply.github.com>
AuthorDate: Mon Jun 8 05:42:00 2020 +1000

    KAFKA-9216: Enforce internal config topic settings for Connect workers during startup
(#8270)
    
    Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
    This is fine unless someone has manually created that topic already with the wrong partition
count.
    
    In such a case Kafka Connect may run for some time. Especially if it's in standalone mode
and once switched to distributed mode it will almost certainly fail.
    
    This commits adds a check when the KafkaConfigBackingStore is starting.
    This check will throw a ConfigException if there is more than one partition in the backing
store.
    
    This exception is then caught upstream and logged by either:
    - DistributedHerder#run
    - ConnectStandalone#main
    
    A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.
    
    Author: Evelyn Bayes <evelyn@confluent.io>
    Co-authored-by: Randall Hauch <rhauch@gmail.com>
    
    Reviewer: Konstantine Karantasis <konstantine@confluent.io>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 10 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 +++
 .../storage/KafkaConfigBackingStoreTest.java       | 36 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 60c2665..9512e03 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -265,6 +265,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting
-- just take care that
         // updates can continue to occur in the background
         configLog.start();
+
+        int partitionCount = configLog.partitionCount();
+        if (partitionCount > 1) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required
"
+                    + "to have a single partition in order to guarantee consistency of "
+                    + "connector configurations, but found %d partitions.",
+                    topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
+            throw new ConfigException(msg);
+        }
+
         started = true;
         log.info("Started KafkaConfigBackingStore");
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e301581..69d2588 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
 
     private Time time;
     private final String topic;
+    private int partitionCount;
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
@@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
 
         for (PartitionInfo partition : partitionInfos)
             partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        partitionCount = partitions.size();
         consumer.assign(partitions);
 
         // Always consume from the beginning of all partitions. Necessary to ensure that
we don't use committed offsets
@@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
         producer.send(new ProducerRecord<>(topic, key, value), callback);
     }
 
+    public int partitionCount() {
+        return partitionCount;
+    }
 
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8d78219..a46f6fe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -60,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -161,6 +163,7 @@ public class KafkaConfigBackingStoreTest {
     public void testStartStop() throws Exception {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
         expectStop();
         PowerMock.replayAll();
 
@@ -208,6 +211,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -276,6 +280,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -360,6 +365,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -421,6 +427,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -474,6 +481,7 @@ public class KafkaConfigBackingStoreTest {
 
         // Shouldn't see any callbacks since this is during startup
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -516,6 +524,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -566,6 +575,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -611,6 +621,7 @@ public class KafkaConfigBackingStoreTest {
         logOffset = 5;
 
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -658,6 +669,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 7;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -712,6 +724,7 @@ public class KafkaConfigBackingStoreTest {
 
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -759,6 +772,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
         logOffset = 8;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -806,6 +820,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Successful attempt to write new task config
         expectReadToEnd(new LinkedHashMap<>());
@@ -860,6 +875,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception
{
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        expectPartitionCount(2);
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
+        assertTrue(e.getMessage().contains("required to have a single partition"));
+
+        PowerMock.verifyAll();
+    }
+
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@@ -868,6 +899,11 @@ public class KafkaConfigBackingStoreTest {
                 .andReturn(storeLog);
     }
 
+    private void expectPartitionCount(int partitionCount) {
+        EasyMock.expect(storeLog.partitionCount())
+                .andReturn(partitionCount);
+    }
+
     // If non-empty, deserializations should be a LinkedHashMap
     private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
                              final Map<byte[], Struct> deserializations) {


Mime
View raw message