kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9991: Fix flaky unit tests (#8843)
Date Wed, 10 Jun 2020 21:11:37 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new dca7146  KAFKA-9991: Fix flaky unit tests (#8843)
dca7146 is described below

commit dca71464b9b69665059582db72efb20ae53ac32d
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed Jun 10 14:07:18 2020 -0700

    KAFKA-9991: Fix flaky unit tests (#8843)
    
    The latest commit #8254 on this test deleted all topics after each test, but the topic
was actually shared among tests before. And after that we are relying on the less-reliable
auto-topic generation to get the topic which makes the test flaky.
    
    I'm now using different topics for different tests, also setting the app.id for tests
differently.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../KTableSourceTopicRestartIntegrationTest.java   | 75 +++++++++++-----------
 1 file changed, 36 insertions(+), 39 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 0a67f41..3b5f296 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.integration;
 
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -52,30 +51,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 
 @Category({IntegrationTest.class})
 public class KTableSourceTopicRestartIntegrationTest {
     private static final int NUM_BROKERS = 3;
     private static final String SOURCE_TOPIC = "source-topic";
+    private static final Properties PRODUCER_CONFIG = new Properties();
+    private static final Properties STREAMS_CONFIG = new Properties();
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
     private final Time time = CLUSTER.time;
-    private KafkaStreams streamsOne;
     private final StreamsBuilder streamsBuilder = new StreamsBuilder();
     private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
 
-    private static final Properties PRODUCER_CONFIG = new Properties();
-    private static final Properties STREAMS_CONFIG = new Properties();
+    private String sourceTopic;
+    private KafkaStreams streams;
     private Map<String, String> expectedInitialResultsMap;
     private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
 
     @BeforeClass
-    public static void setUpBeforeAllTests() throws Exception {
-        CLUSTER.createTopic(SOURCE_TOPIC);
-
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
+    public static void setUpBeforeAllTests() {
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@@ -96,8 +93,13 @@ public class KTableSourceTopicRestartIntegrationTest {
     public TestName testName = new TestName();
 
     @Before
-    public void before() {
-        final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store"));
+    public void before() throws Exception {
+        sourceTopic = SOURCE_TOPIC + "-" + testName.getMethodName();
+        CLUSTER.createTopic(sourceTopic);
+
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(),
testName));
+
+        final KTable<String, String> kTable = streamsBuilder.table(sourceTopic, Materialized.as("store"));
         kTable.toStream().foreach(readKeyValues::put);
 
         expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
@@ -107,24 +109,23 @@ public class KTableSourceTopicRestartIntegrationTest {
     @After
     public void after() throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        CLUSTER.deleteAllTopicsAndWait(60000L);
     }
 
     @Test
     public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled()
throws Exception {
         try {
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streamsOne.start();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.start();
 
             produceKeyValues("a", "b", "c");
 
             assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not
read all values");
 
-            streamsOne.close();
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.close();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             // the state restore listener will append one record to the log
-            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
-            streamsOne.start();
+            streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streams.start();
 
             produceKeyValues("f", "g", "h");
 
@@ -133,7 +134,7 @@ public class KTableSourceTopicRestartIntegrationTest {
                 expectedResultsWithDataWrittenDuringRestoreMap,
                 "Table did not get all values after restart");
         } finally {
-            streamsOne.close(Duration.ofSeconds(5));
+            streams.close(Duration.ofSeconds(5));
         }
     }
 
@@ -151,18 +152,18 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled()
throws Exception {
         try {
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streamsOne.start();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.start();
 
             produceKeyValues("a", "b", "c");
 
             assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not
read all values");
 
-            streamsOne.close();
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.close();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
             // the state restore listener will append one record to the log
-            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
-            streamsOne.start();
+            streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streams.start();
 
             produceKeyValues("f", "g", "h");
 
@@ -171,23 +172,23 @@ public class KTableSourceTopicRestartIntegrationTest {
                 expectedResultsWithDataWrittenDuringRestoreMap,
                 "Table did not get all values after restart");
         } finally {
-            streamsOne.close(Duration.ofSeconds(5));
+            streams.close(Duration.ofSeconds(5));
         }
     }
 
     @Test
     public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception
{
         try {
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streamsOne.start();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.start();
 
             produceKeyValues("a", "b", "c");
 
             assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not
read all values");
 
-            streamsOne.close();
-            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
-            streamsOne.start();
+            streams.close();
+            streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streams.start();
 
             produceKeyValues("f", "g", "h");
 
@@ -195,7 +196,7 @@ public class KTableSourceTopicRestartIntegrationTest {
 
             assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all
values after restart");
         } finally {
-            streamsOne.close(Duration.ofSeconds(5));
+            streams.close(Duration.ofSeconds(5));
         }
     }
 
@@ -208,14 +209,14 @@ public class KTableSourceTopicRestartIntegrationTest {
             errorMessage);
     }
 
-    private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException
{
+    private void produceKeyValues(final String... keys) {
         final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
 
         for (final String key : keys) {
             keyValueList.add(new KeyValue<>(key, key + "1"));
         }
 
-        IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC,
+        IntegrationTestUtils.produceKeyValuesSynchronously(sourceTopic,
                                                            keyValueList,
                                                            PRODUCER_CONFIG,
                                                            time);
@@ -236,11 +237,7 @@ public class KTableSourceTopicRestartIntegrationTest {
                                    final String storeName,
                                    final long startingOffset,
                                    final long endingOffset) {
-            try {
-                produceKeyValues("d");
-            } catch (final ExecutionException | InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+            produceKeyValues("d");
         }
 
         @Override


Mime
View raw message