kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853)
Date Fri, 12 Jul 2019 19:18:37 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ae4a975  MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest
(#6853)
ae4a975 is described below

commit ae4a97543e990407096d9a1e63c77bab8c18649c
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Fri Jul 12 15:18:17 2019 -0400

    MINOR: Create a new topic for each test for flaky RegexSourceIntegrationTest (#6853)
    
    The RegexSourceIntegrationTest has some flakiness as it deletes and re-creates the same
output topic before each test. This PR reduces the chance for errors by creating a unique
output topic for each test.
    
    Reviewers:  Matthias J. Sax <mjsax@apache.org>,  Boyang Chen <boyang@confluent.io>
---
 .../integration/RegexSourceIntegrationTest.java    | 42 +++++++++++++---------
 1 file changed, 25 insertions(+), 17 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index f74487b..10e0650 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -62,6 +62,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -90,11 +91,12 @@ public class RegexSourceIntegrationTest {
     private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
     private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
 
-    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
     private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
     private KafkaStreams streams;
+    private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0);
+    private String outputTopic;
 
 
     @BeforeClass
@@ -107,16 +109,14 @@ public class RegexSourceIntegrationTest {
             TOPIC_Y,
             TOPIC_Z,
             FA_TOPIC,
-            FOO_TOPIC,
-            DEFAULT_OUTPUT_TOPIC);
+            FOO_TOPIC);
         CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
         CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
     }
 
     @Before
-    public void setUp() throws Exception {
-        CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC);
-
+    public void setUp() throws InterruptedException {
+        outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
         final Properties properties = new Properties();
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
@@ -141,6 +141,7 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+
         final Serde<String> stringSerde = Serdes.String();
         final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1",
"TEST-TOPIC-2");
@@ -151,7 +152,7 @@ public class RegexSourceIntegrationTest {
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
+        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
         final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
             @Override
@@ -175,6 +176,12 @@ public class RegexSourceIntegrationTest {
 
     }
 
+    private String createTopic(final int suffix) throws InterruptedException {
+        final String outputTopic = "outputTopic_" + suffix;
+        CLUSTER.createTopic(outputTopic);
+        return outputTopic;
+    }
+
     @Test
     public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
 
@@ -188,7 +195,7 @@ public class RegexSourceIntegrationTest {
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
 
-        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
+        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
 
         final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
@@ -262,9 +269,9 @@ public class RegexSourceIntegrationTest {
         final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
         final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y,
TOPIC_Z));
 
-        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
-        pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
-        namedTopicsStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
+        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
+        pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
+        namedTopicsStream.to(outputTopic, Produced.with(stringSerde, stringSerde));
 
         streams = new KafkaStreams(builder.build(), streamsConfiguration);
         streams.start();
@@ -281,7 +288,7 @@ public class RegexSourceIntegrationTest {
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class);
 
         final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage,
topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
-        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, 6);
+        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, 6);
         final List<String> actualValues = new ArrayList<>(6);
 
         for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
@@ -308,8 +315,8 @@ public class RegexSourceIntegrationTest {
             final KStream<String, String> partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
 
 
-            partitionedStreamLeader.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
-            partitionedStreamFollower.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde,
stringSerde));
+            partitionedStreamLeader.to(outputTopic, Produced.with(stringSerde, stringSerde));
+            partitionedStreamFollower.to(outputTopic, Produced.with(stringSerde, stringSerde));
 
             final List<String> leaderAssignment = new ArrayList<>();
             final List<String> followerAssignment = new ArrayList<>();
@@ -355,6 +362,7 @@ public class RegexSourceIntegrationTest {
 
     @Test
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
+
         final String fMessage = "fMessage";
         final String fooMessage = "fooMessage";
         final Serde<String> stringSerde = Serdes.String();
@@ -365,8 +373,8 @@ public class RegexSourceIntegrationTest {
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
         final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
 
-        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
-        pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
+        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
+        pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
 
         final AtomicBoolean expectError = new AtomicBoolean(false);
 
@@ -385,7 +393,7 @@ public class RegexSourceIntegrationTest {
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class);
         try {
-            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC,
2, 5000);
+            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic,
2, 5000);
             throw new IllegalStateException("This should not happen: an assertion error should
have been thrown before this.");
         } catch (final AssertionError e) {
             // this is fine


Mime
View raw message