kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799)
Date Fri, 05 Jun 2020 21:38:41 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5a0e65e  KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799)
5a0e65e is described below

commit 5a0e65ed394da76ddebf387739f9dec8687a9485
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Fri Jun 5 14:38:08 2020 -0700

    KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799)
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../processor/internals/RecordDeserializer.java    |   4 +-
 .../integration/RegexSourceIntegrationTest.java    | 185 ++++++++++-----------
 2 files changed, 95 insertions(+), 94 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index d0a2a80..86b1b44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -63,7 +63,9 @@ class RecordDeserializer {
                 rawRecord.serializedKeySize(),
                 rawRecord.serializedValueSize(),
                 sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
-                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()),
rawRecord.headers());
+                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()),
+                rawRecord.headers()
+            );
         } catch (final Exception deserializationException) {
             final DeserializationExceptionHandler.DeserializationHandlerResponse response;
             try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 3e205f7..44cc745 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -41,8 +41,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -52,6 +52,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -124,11 +125,13 @@ public class RegexSourceIntegrationTest {
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
-                                                                 CLUSTER.bootstrapServers(),
-                                                                 STRING_SERDE_CLASSNAME,
-                                                                 STRING_SERDE_CLASSNAME,
-                                                                 properties);
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+            IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new
TestName()),
+            CLUSTER.bootstrapServers(),
+            STRING_SERDE_CLASSNAME,
+            STRING_SERDE_CLASSNAME,
+            properties
+        );
     }
 
     @After
@@ -142,83 +145,89 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+        try {
+            final Serde<String> stringSerde = Serdes.String();
 
-        final Serde<String> stringSerde = Serdes.String();
-
-        final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
-        // we compare lists of subscribed topics and hence requiring the order as well; this
is guaranteed
-        // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2
so the list is always
-        // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed
it may become a flaky test
-        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1",
"TEST-TOPIC-2");
+            final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
+            // we compare lists of subscribed topics and hence requiring the order as well;
this is guaranteed
+            // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2
so the list is always
+            // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever
changed it may become a flaky test
+            final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1",
"TEST-TOPIC-2");
 
-        CLUSTER.createTopic("TEST-TOPIC-1");
+            CLUSTER.createTopic("TEST-TOPIC-1");
 
-        final StreamsBuilder builder = new StreamsBuilder();
+            final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
-            @Override
-            public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
-                return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
-                    @Override
-                    public void subscribe(final Pattern topics, final ConsumerRebalanceListener
listener) {
-                        super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics,
listener));
-                    }
-                };
+            pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
+            final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
+                    return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final ConsumerRebalanceListener
listener) {
+                            super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics,
listener));
+                        }
+                    };
 
-            }
-        });
+                }
+            });
 
-        streams.start();
-        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment),
STREAM_TASKS_NOT_UPDATED);
+            streams.start();
+            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment),
STREAM_TASKS_NOT_UPDATED);
 
-        CLUSTER.createTopic("TEST-TOPIC-2");
+            CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment),
STREAM_TASKS_NOT_UPDATED);
+            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment),
STREAM_TASKS_NOT_UPDATED);
 
-        streams.close();
-        CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+            streams.close();
+        } finally {
+            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+        }
     }
 
     @Test
     public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
         final String topic1 = "TEST-TOPIC-1";
-        CLUSTER.createTopic(topic1);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-        pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-        streams = new KafkaStreams(builder.build(), streamsConfiguration);
         final String topic2 = "TEST-TOPIC-2";
-        streams.start();
 
-        CLUSTER.createTopic(topic2);
+        try {
+            CLUSTER.createTopic(topic1);
+
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+            streams = new KafkaStreams(builder.build(), streamsConfiguration);
+            streams.start();
 
-        final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
-        final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
-        IntegrationTestUtils.produceKeyValuesSynchronously(
+            CLUSTER.createTopic(topic2);
+
+            final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+            final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
+            IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic1,
                 Collections.singletonList(record1),
                 TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class,
StringSerializer.class),
                 CLUSTER.time
-        );
-        IntegrationTestUtils.produceKeyValuesSynchronously(
+            );
+            IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic2,
                 Collections.singletonList(record2),
                 TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class,
StringSerializer.class),
                 CLUSTER.time
-        );
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+            );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
                 TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class,
StringDeserializer.class),
                 outputTopic,
                 Arrays.asList(record1, record2)
-        );
+            );
 
-        streams.close();
-        CLUSTER.deleteTopicsAndWait(topic1, topic2);
+            streams.close();
+        } finally {
+            CLUSTER.deleteTopicsAndWait(topic1, topic2);
+        }
     }
 
     private String createTopic(final int suffix) throws InterruptedException {
@@ -229,44 +238,44 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
-
         final Serde<String> stringSerde = Serdes.String();
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A",
"TEST-TOPIC-B");
         final List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B");
+        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
 
-        CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
+        try {
+            CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
-        final StreamsBuilder builder = new StreamsBuilder();
+            final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
+            final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
 
-        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
-
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
-            @Override
-            public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
-                return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
-                    @Override
-                    public void subscribe(final Pattern topics, final ConsumerRebalanceListener
listener) {
-                        super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics,
listener));
-                    }
-                };
-            }
-        });
+            pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
 
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
+                    return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final ConsumerRebalanceListener
listener) {
+                            super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics,
listener));
+                        }
+                    };
+                }
+            });
 
-        streams.start();
-        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment),
STREAM_TASKS_NOT_UPDATED);
 
-        CLUSTER.deleteTopic("TEST-TOPIC-A");
+            streams.start();
+            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment),
STREAM_TASKS_NOT_UPDATED);
+        } finally {
+            CLUSTER.deleteTopic("TEST-TOPIC-A");
+        }
 
         TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment),
STREAM_TASKS_NOT_UPDATED);
     }
 
     @Test
-    public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
-
+    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new
MockKeyValueStoreBuilder("testStateStore", false);
         final long thirtySecondTimeout = 30 * 1000;
@@ -277,26 +286,19 @@ public class RegexSourceIntegrationTest {
         topology.addStateStore(storeBuilder, "my-processor");
 
         streams = new KafkaStreams(topology, streamsConfiguration);
+        streams.start();
 
-        try {
-            streams.start();
-
-            final TestCondition stateStoreNameBoundToSourceTopic = () -> {
-                final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
-                final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
-                return topicNamesList != null && !topicNamesList.isEmpty() &&
topicNamesList.get(0).equals("topic-1");
-            };
-
-            TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout,
"Did not find topic: [topic-1] connected to state store: [testStateStore]");
+        final TestCondition stateStoreNameBoundToSourceTopic = () -> {
+            final Map<String, List<String>> stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
+            final List<String> topicNamesList = stateStoreToSourceTopic.get("testStateStore");
+            return topicNamesList != null && !topicNamesList.isEmpty() &&
topicNamesList.get(0).equals("topic-1");
+        };
 
-        } finally {
-            streams.close();
-        }
+        TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout,
"Did not find topic: [topic-1] connected to state store: [testStateStore]");
     }
 
     @Test
     public void testShouldReadFromRegexAndNamedTopics() throws Exception {
-
         final String topic1TestMessage = "topic-1 test";
         final String topic2TestMessage = "topic-2 test";
         final String topicATestMessage = "topic-A test";
@@ -346,7 +348,6 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
-
         KafkaStreams partitionedStreamsLeader = null;
         KafkaStreams partitionedStreamsFollower = null;
         try {
@@ -401,12 +402,10 @@ public class RegexSourceIntegrationTest {
                 partitionedStreamsFollower.close();
             }
         }
-
     }
 
     @Test
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
-
         final String fMessage = "fMessage";
         final String fooMessage = "fooMessage";
         final Serde<String> stringSerde = Serdes.String();


Mime
View raw message