From commits-return-14799-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Jun 10 21:11:42 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 50A9A19D41 for ; Wed, 10 Jun 2020 21:11:42 +0000 (UTC) Received: (qmail 18867 invoked by uid 500); 10 Jun 2020 21:11:41 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 18803 invoked by uid 500); 10 Jun 2020 21:11:41 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 18783 invoked by uid 99); 10 Jun 2020 21:11:41 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2020 21:11:41 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id EB8758D417; Wed, 10 Jun 2020 21:11:40 +0000 (UTC) Date: Wed, 10 Jun 2020 21:11:37 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.6 updated: KAFKA-9991: Fix flaky unit tests (#8843) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159182349351.28801.2851282063314245555@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.6 X-Git-Reftype: branch X-Git-Oldrev: b146248442de7199b3458af479a2fcdc061fb127 X-Git-Newrev: dca71464b9b69665059582db72efb20ae53ac32d X-Git-Rev: dca71464b9b69665059582db72efb20ae53ac32d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new dca7146 KAFKA-9991: Fix flaky unit tests (#8843) dca7146 is described below commit dca71464b9b69665059582db72efb20ae53ac32d Author: Guozhang Wang AuthorDate: Wed Jun 10 14:07:18 2020 -0700 KAFKA-9991: Fix flaky unit tests (#8843) The latest commit #8254 on this test deleted all topics after each test, but the topic was actually shared among tests before. And after that we are relying on the less-reliable auto-topic generation to get the topic which makes the test flaky. I'm now using different topics for different tests, also setting the app.id for tests differently. Reviewers: Boyang Chen , A. Sophie Blee-Goldman , Matthias J. Sax --- .../KTableSourceTopicRestartIntegrationTest.java | 75 +++++++++++----------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 0a67f41..3b5f296 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.integration; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; @@ -52,30 +51,28 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; @Category({IntegrationTest.class}) public class KTableSourceTopicRestartIntegrationTest { private static final int NUM_BROKERS = 3; private static final String SOURCE_TOPIC = "source-topic"; + private static final Properties PRODUCER_CONFIG = new Properties(); + private static final Properties STREAMS_CONFIG = new Properties(); @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final Time time = CLUSTER.time; - private KafkaStreams streamsOne; private final StreamsBuilder streamsBuilder = new StreamsBuilder(); private final Map readKeyValues = new ConcurrentHashMap<>(); - private static final Properties PRODUCER_CONFIG = new Properties(); - private static final Properties STREAMS_CONFIG = new Properties(); + private String sourceTopic; + private KafkaStreams streams; private Map expectedInitialResultsMap; private Map expectedResultsWithDataWrittenDuringRestoreMap; @BeforeClass - public static void setUpBeforeAllTests() throws Exception { - CLUSTER.createTopic(SOURCE_TOPIC); - - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source"); + public static void setUpBeforeAllTests() { STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -96,8 +93,13 @@ public class KTableSourceTopicRestartIntegrationTest { public TestName testName = new TestName(); @Before - public void before() { - final KTable kTable = streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store")); + public void before() throws Exception { + sourceTopic = SOURCE_TOPIC + "-" + testName.getMethodName(); + CLUSTER.createTopic(sourceTopic); + + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testName)); + + final KTable kTable = streamsBuilder.table(sourceTopic, Materialized.as("store")); kTable.toStream().foreach(readKeyValues::put); expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c"); @@ -107,24 +109,23 @@ public class KTableSourceTopicRestartIntegrationTest { @After public void after() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); - CLUSTER.deleteAllTopicsAndWait(60000L); } @Test public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception { try { - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streamsOne.start(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.start(); produceKeyValues("a", "b", "c"); assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values"); - streamsOne.close(); - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.close(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); // the state restore listener will append one record to the log - streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); - streamsOne.start(); + streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); + streams.start(); produceKeyValues("f", "g", "h"); @@ -133,7 +134,7 @@ public class KTableSourceTopicRestartIntegrationTest { expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart"); } finally { - streamsOne.close(Duration.ofSeconds(5)); + streams.close(Duration.ofSeconds(5)); } } @@ -151,18 +152,18 @@ public class KTableSourceTopicRestartIntegrationTest { private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception { try { - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streamsOne.start(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.start(); produceKeyValues("a", "b", "c"); assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values"); - streamsOne.close(); - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.close(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); // the state restore listener will append one record to the log - streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); - streamsOne.start(); + streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); + streams.start(); produceKeyValues("f", "g", "h"); @@ -171,23 +172,23 @@ public class KTableSourceTopicRestartIntegrationTest { expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart"); } finally { - streamsOne.close(Duration.ofSeconds(5)); + streams.close(Duration.ofSeconds(5)); } } @Test public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception { try { - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streamsOne.start(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.start(); produceKeyValues("a", "b", "c"); assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values"); - streamsOne.close(); - streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streamsOne.start(); + streams.close(); + streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); + streams.start(); produceKeyValues("f", "g", "h"); @@ -195,7 +196,7 @@ public class KTableSourceTopicRestartIntegrationTest { assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart"); } finally { - streamsOne.close(Duration.ofSeconds(5)); + streams.close(Duration.ofSeconds(5)); } } @@ -208,14 +209,14 @@ public class KTableSourceTopicRestartIntegrationTest { errorMessage); } - private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException { + private void produceKeyValues(final String... keys) { final List> keyValueList = new ArrayList<>(); for (final String key : keys) { keyValueList.add(new KeyValue<>(key, key + "1")); } - IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC, + IntegrationTestUtils.produceKeyValuesSynchronously(sourceTopic, keyValueList, PRODUCER_CONFIG, time); @@ -236,11 +237,7 @@ public class KTableSourceTopicRestartIntegrationTest { final String storeName, final long startingOffset, final long endingOffset) { - try { - produceKeyValues("d"); - } catch (final ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } + produceKeyValues("d"); } @Override