This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5a0e65e KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799) 5a0e65e is described below commit 5a0e65ed394da76ddebf387739f9dec8687a9485 Author: Matthias J. Sax AuthorDate: Fri Jun 5 14:38:08 2020 -0700 KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799) Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../processor/internals/RecordDeserializer.java | 4 +- .../integration/RegexSourceIntegrationTest.java | 185 ++++++++++----------- 2 files changed, 95 insertions(+), 94 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index d0a2a80..86b1b44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -63,7 +63,9 @@ class RecordDeserializer { rawRecord.serializedKeySize(), rawRecord.serializedValueSize(), sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()), - sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), rawRecord.headers()); + sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()), + rawRecord.headers() + ); } catch (final Exception deserializationException) { final DeserializationExceptionHandler.DeserializationHandlerResponse response; try { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 3e205f7..44cc745 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -41,8 +41,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockKeyValueStoreBuilder; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -52,6 +52,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import java.io.IOException; import java.util.ArrayList; @@ -124,11 +125,13 @@ public class RegexSourceIntegrationTest { properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", - CLUSTER.bootstrapServers(), - STRING_SERDE_CLASSNAME, - STRING_SERDE_CLASSNAME, - properties); + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new TestName()), + CLUSTER.bootstrapServers(), + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME, + properties + ); } @After @@ -142,83 +145,89 @@ public class RegexSourceIntegrationTest { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { + try { + final Serde stringSerde = Serdes.String(); - final Serde stringSerde = Serdes.String(); - - final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); - // we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed - // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always - // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test - final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); + final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); + // we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed + // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always + // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test + final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); - CLUSTER.createTopic("TEST-TOPIC-1"); + CLUSTER.createTopic("TEST-TOPIC-1"); - final StreamsBuilder builder = new StreamsBuilder(); + final StreamsBuilder builder = new StreamsBuilder(); - final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); + final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); - pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); - final List assignedTopics = new CopyOnWriteArrayList<>(); - streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { - @Override - public Consumer getConsumer(final Map config) { - return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { - @Override - public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { - super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); - } - }; + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); + final List assignedTopics = new CopyOnWriteArrayList<>(); + streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + @Override + public Consumer getConsumer(final Map config) { + return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { + @Override + public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { + super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); + } + }; - } - }); + } + }); - streams.start(); - TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); + streams.start(); + TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); - CLUSTER.createTopic("TEST-TOPIC-2"); + CLUSTER.createTopic("TEST-TOPIC-2"); - TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); - streams.close(); - CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); + streams.close(); + } finally { + CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); + } } @Test public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { final String topic1 = "TEST-TOPIC-1"; - CLUSTER.createTopic(topic1); - - final StreamsBuilder builder = new StreamsBuilder(); - final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); - pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); - streams = new KafkaStreams(builder.build(), streamsConfiguration); final String topic2 = "TEST-TOPIC-2"; - streams.start(); - CLUSTER.createTopic(topic2); + try { + CLUSTER.createTopic(topic1); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); + pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + streams = new KafkaStreams(builder.build(), streamsConfiguration); + streams.start(); - final KeyValue record1 = new KeyValue<>("1", "1"); - final KeyValue record2 = new KeyValue<>("2", "2"); - IntegrationTestUtils.produceKeyValuesSynchronously( + CLUSTER.createTopic(topic2); + + final KeyValue record1 = new KeyValue<>("1", "1"); + final KeyValue record2 = new KeyValue<>("2", "2"); + IntegrationTestUtils.produceKeyValuesSynchronously( topic1, Collections.singletonList(record1), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), CLUSTER.time - ); - IntegrationTestUtils.produceKeyValuesSynchronously( + ); + IntegrationTestUtils.produceKeyValuesSynchronously( topic2, Collections.singletonList(record2), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), CLUSTER.time - ); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + ); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), outputTopic, Arrays.asList(record1, record2) - ); + ); - streams.close(); - CLUSTER.deleteTopicsAndWait(topic1, topic2); + streams.close(); + } finally { + CLUSTER.deleteTopicsAndWait(topic1, topic2); + } } private String createTopic(final int suffix) throws InterruptedException { @@ -229,44 +238,44 @@ public class RegexSourceIntegrationTest { @Test public void testRegexMatchesTopicsAWhenDeleted() throws Exception { - final Serde stringSerde = Serdes.String(); final List expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); final List expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B"); + final List assignedTopics = new CopyOnWriteArrayList<>(); - CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B"); + try { + CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B"); - final StreamsBuilder builder = new StreamsBuilder(); + final StreamsBuilder builder = new StreamsBuilder(); - final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); + final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); - pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); - - final List assignedTopics = new CopyOnWriteArrayList<>(); - streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { - @Override - public Consumer getConsumer(final Map config) { - return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { - @Override - public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { - super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); - } - }; - } - }); + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); + streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { + @Override + public Consumer getConsumer(final Map config) { + return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { + @Override + public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { + super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); + } + }; + } + }); - streams.start(); - TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); - CLUSTER.deleteTopic("TEST-TOPIC-A"); + streams.start(); + TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); + } finally { + CLUSTER.deleteTopic("TEST-TOPIC-A"); + } TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); } @Test - public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException { - + public void shouldAddStateStoreToRegexDefinedSource() throws Exception { final ProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); final StoreBuilder> storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false); final long thirtySecondTimeout = 30 * 1000; @@ -277,26 +286,19 @@ public class RegexSourceIntegrationTest { topology.addStateStore(storeBuilder, "my-processor"); streams = new KafkaStreams(topology, streamsConfiguration); + streams.start(); - try { - streams.start(); - - final TestCondition stateStoreNameBoundToSourceTopic = () -> { - final Map> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics(); - final List topicNamesList = stateStoreToSourceTopic.get("testStateStore"); - return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1"); - }; - - TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]"); + final TestCondition stateStoreNameBoundToSourceTopic = () -> { + final Map> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics(); + final List topicNamesList = stateStoreToSourceTopic.get("testStateStore"); + return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1"); + }; - } finally { - streams.close(); - } + TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]"); } @Test public void testShouldReadFromRegexAndNamedTopics() throws Exception { - final String topic1TestMessage = "topic-1 test"; final String topic2TestMessage = "topic-2 test"; final String topicATestMessage = "topic-A test"; @@ -346,7 +348,6 @@ public class RegexSourceIntegrationTest { @Test public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception { - KafkaStreams partitionedStreamsLeader = null; KafkaStreams partitionedStreamsFollower = null; try { @@ -401,12 +402,10 @@ public class RegexSourceIntegrationTest { partitionedStreamsFollower.close(); } } - } @Test public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { - final String fMessage = "fMessage"; final String fooMessage = "fooMessage"; final Serde stringSerde = Serdes.String();