kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
Date Sun, 07 Jun 2020 20:03:10 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new e90e8ca  KAFKA-9216: Enforce internal config topic settings for Connect workers during
startup (#8270)
e90e8ca is described below

commit e90e8ca72429dc368d29fd877ec27dddd4d32e82
Author: Evelyn Bayes <30654260+Evelyn-Bayes@users.noreply.github.com>
AuthorDate: Mon Jun 8 05:42:00 2020 +1000

    KAFKA-9216: Enforce internal config topic settings for Connect workers during startup
(#8270)
    
    Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
    This is fine unless someone has manually created that topic already with the wrong partition
count.
    
    In such a case Kafka Connect may run for some time. Especially if it's in standalone mode
and once switched to distributed mode it will almost certainly fail.
    
    This commits adds a check when the KafkaConfigBackingStore is starting.
    This check will throw a ConfigException if there is more than one partition in the backing
store.
    
    This exception is then caught upstream and logged by either:
    - DistributedHerder#run
    - ConnectStandalone#main
    
    A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.
    
    Author: Evelyn Bayes <evelyn@confluent.io>
    Co-authored-by: Randall Hauch <rhauch@gmail.com>
    
    Reviewer: Konstantine Karantasis <konstantine@confluent.io>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 10 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 +++
 .../storage/KafkaConfigBackingStoreTest.java       | 36 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 658d6c3..4c6429c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -263,6 +263,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting
-- just take care that
         // updates can continue to occur in the background
         configLog.start();
+
+        int partitionCount = configLog.partitionCount();
+        if (partitionCount > 1) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required
"
+                    + "to have a single partition in order to guarantee consistency of "
+                    + "connector configurations, but found %d partitions.",
+                    topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
+            throw new ConfigException(msg);
+        }
+
         started = true;
         log.info("Started KafkaConfigBackingStore");
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e301581..69d2588 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
 
     private Time time;
     private final String topic;
+    private int partitionCount;
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
@@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
 
         for (PartitionInfo partition : partitionInfos)
             partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        partitionCount = partitions.size();
         consumer.assign(partitions);
 
         // Always consume from the beginning of all partitions. Necessary to ensure that
we don't use committed offsets
@@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
         producer.send(new ProducerRecord<>(topic, key, value), callback);
     }
 
+    public int partitionCount() {
+        return partitionCount;
+    }
 
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 512dcb3..9a55690 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -156,6 +158,7 @@ public class KafkaConfigBackingStoreTest {
     public void testStartStop() throws Exception {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -199,6 +202,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -267,6 +271,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -351,6 +356,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -412,6 +418,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -465,6 +472,7 @@ public class KafkaConfigBackingStoreTest {
 
         // Shouldn't see any callbacks since this is during startup
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -507,6 +515,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -557,6 +566,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -602,6 +612,7 @@ public class KafkaConfigBackingStoreTest {
         logOffset = 5;
 
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -649,6 +660,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 7;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -703,6 +715,7 @@ public class KafkaConfigBackingStoreTest {
 
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -750,6 +763,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
         logOffset = 8;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -797,6 +811,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Successful attempt to write new task config
         expectReadToEnd(new LinkedHashMap<>());
@@ -851,6 +866,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception
{
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        expectPartitionCount(2);
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
+        ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
+        assertTrue(e.getMessage().contains("required to have a single partition"));
+
+        PowerMock.verifyAll();
+    }
+
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@@ -859,6 +890,11 @@ public class KafkaConfigBackingStoreTest {
                 .andReturn(storeLog);
     }
 
+    private void expectPartitionCount(int partitionCount) {
+        EasyMock.expect(storeLog.partitionCount())
+                .andReturn(partitionCount);
+    }
+
     // If non-empty, deserializations should be a LinkedHashMap
     private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
                              final Map<byte[], Struct> deserializations) {


Mime
View raw message