This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new f924fb3 KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270) f924fb3 is described below commit f924fb3125dc1b7ff354b83dbdda2156269bc292 Author: Evelyn Bayes <30654260+Evelyn-Bayes@users.noreply.github.com> AuthorDate: Mon Jun 8 05:42:00 2020 +1000 KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270) Currently, Kafka Connect creates its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. This commits adds a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: - DistributedHerder#run - ConnectStandalone#main A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour. Author: Evelyn Bayes Co-authored-by: Randall Hauch Reviewer: Konstantine Karantasis --- .../connect/storage/KafkaConfigBackingStore.java | 10 ++++++ .../apache/kafka/connect/util/KafkaBasedLog.java | 5 +++ .../storage/KafkaConfigBackingStoreTest.java | 36 ++++++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index b6b4ebf..a51a064 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -249,6 +249,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that // updates can continue to occur in the background configLog.start(); + + int partitionCount = configLog.partitionCount(); + if (partitionCount > 1) { + String msg = String.format("Topic '%s' supplied via the '%s' property is required " + + "to have a single partition in order to guarantee consistency of " + + "connector configurations, but found %d partitions.", + topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount); + throw new ConfigException(msg); + } + started = true; log.info("Started KafkaConfigBackingStore"); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index e301581..69d2588 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -74,6 +74,7 @@ public class KafkaBasedLog { private Time time; private final String topic; + private int partitionCount; private final Map producerConfigs; private final Map consumerConfigs; private final Callback> consumedCallback; @@ -145,6 +146,7 @@ public class KafkaBasedLog { for (PartitionInfo partition : partitionInfos) partitions.add(new TopicPartition(partition.topic(), partition.partition())); + partitionCount = partitions.size(); consumer.assign(partitions); // Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets @@ -238,6 +240,9 @@ public class KafkaBasedLog { producer.send(new ProducerRecord<>(topic, key, value), callback); } + public int partitionCount() { + return partitionCount; + } private Producer createProducer() { // Always require producer acks to all to ensure durable writes diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 0a82a09..81ebdac 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @@ -156,6 +158,7 @@ public class KafkaConfigBackingStoreTest { public void testStartStop() throws Exception { expectConfigure(); expectStart(Collections.emptyList(), Collections.emptyMap()); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -199,6 +202,7 @@ public class KafkaConfigBackingStoreTest { configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1)); EasyMock.expectLastCall(); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -267,6 +271,7 @@ public class KafkaConfigBackingStoreTest { serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); expectReadToEnd(serializedConfigs); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -351,6 +356,7 @@ public class KafkaConfigBackingStoreTest { serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4)); expectReadToEnd(serializedConfigs); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -412,6 +418,7 @@ public class KafkaConfigBackingStoreTest { serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); expectReadToEnd(serializedConfigs); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -465,6 +472,7 @@ public class KafkaConfigBackingStoreTest { // Shouldn't see any callbacks since this is during startup + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -507,6 +515,7 @@ public class KafkaConfigBackingStoreTest { configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0)); EasyMock.expectLastCall(); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -557,6 +566,7 @@ public class KafkaConfigBackingStoreTest { configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0)); EasyMock.expectLastCall(); + expectPartitionCount(1); expectStop(); PowerMock.replayAll(); @@ -602,6 +612,7 @@ public class KafkaConfigBackingStoreTest { logOffset = 5; expectStart(existingRecords, deserialized); + expectPartitionCount(1); // Shouldn't see any callbacks since this is during startup @@ -649,6 +660,7 @@ public class KafkaConfigBackingStoreTest { deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); logOffset = 7; expectStart(existingRecords, deserialized); + expectPartitionCount(1); // Shouldn't see any callbacks since this is during startup @@ -703,6 +715,7 @@ public class KafkaConfigBackingStoreTest { logOffset = 6; expectStart(existingRecords, deserialized); + expectPartitionCount(1); // Shouldn't see any callbacks since this is during startup @@ -750,6 +763,7 @@ public class KafkaConfigBackingStoreTest { deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); logOffset = 8; expectStart(existingRecords, deserialized); + expectPartitionCount(1); // Shouldn't see any callbacks since this is during startup @@ -797,6 +811,7 @@ public class KafkaConfigBackingStoreTest { deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); logOffset = 6; expectStart(existingRecords, deserialized); + expectPartitionCount(1); // Successful attempt to write new task config expectReadToEnd(new LinkedHashMap()); @@ -851,6 +866,22 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } + @Test + public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception { + expectConfigure(); + expectStart(Collections.emptyList(), Collections.emptyMap()); + + expectPartitionCount(2); + + PowerMock.replayAll(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); + ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start()); + assertTrue(e.getMessage().contains("required to have a single partition")); + + PowerMock.verifyAll(); + } + private void expectConfigure() throws Exception { PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), @@ -859,6 +890,11 @@ public class KafkaConfigBackingStoreTest { .andReturn(storeLog); } + private void expectPartitionCount(int partitionCount) { + EasyMock.expect(storeLog.partitionCount()) + .andReturn(partitionCount); + } + // If non-empty, deserializations should be a LinkedHashMap private void expectStart(final List> preexistingRecords, final Map deserializations) {