kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Update test to wait for final value to reduce flakiness updated test method for multiple keys (#5517)
Date Fri, 17 Aug 2018 19:14:41 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c4e672  MINOR: Update test to wait for final value to reduce flakiness updated test
method for multiple keys (#5517)
7c4e672 is described below

commit 7c4e6724699bb6fc65112b5513848c733a03019e
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Fri Aug 17 15:14:29 2018 -0400

    MINOR: Update test to wait for final value to reduce flakiness updated test method for
multiple keys (#5517)
    
    Updated two integration tests to use IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived
to eliminate flaky test results.
    
    Also, I updated IntegrationTestUtils#waitUntilFinalKeyValueRecordsReceived method to support
having results with the same key present with different values.
    
    For testing, I ran the current suite of streams tests.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../RepartitionOptimizingIntegrationTest.java      | 13 ++-----
 ...artitionWithMergeOptimizingIntegrationTest.java |  9 +----
 .../integration/utils/IntegrationTestUtils.java    | 43 +++++++++++-----------
 3 files changed, 27 insertions(+), 38 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index e192c70..5eebf04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -195,25 +195,20 @@ public class RepartitionOptimizingIntegrationTest {
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A",
3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1,
COUNT_TOPIC, expectedCountKeyValues.size());
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC,
expectedCountKeyValues);
 
         final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A",
9), KeyValue.pair("B", 9), KeyValue.pair("C", 9));
-        final List<KeyValue<String, Integer>> receivedAggKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2,
AGGREGATION_TOPIC, expectedAggKeyValues.size());
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC,
expectedAggKeyValues);
 
         final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A",
"foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
-        final List<KeyValue<String, Integer>> receivedReduceKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3,
REDUCE_TOPIC, expectedAggKeyValues.size());
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC,
expectedReduceKeyValues);
 
         final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A",
"foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3"));
-        final List<KeyValue<String, Integer>> receivedJoinKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3,
JOINED_TOPIC, expectedJoinKeyValues.size());
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC,
expectedJoinKeyValues);
 
 
         final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO",
"BAR", "BAZ");
 
-        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
-        assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues));
-        assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues));
-        assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues));
-
         assertThat(3, equalTo(processorValueCollector.size()));
         assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index 58d903a..af1f5f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -51,9 +51,7 @@ import java.util.regex.Pattern;
 
 import kafka.utils.MockTime;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 @Category({IntegrationTest.class})
 public class RepartitionWithMergeOptimizingIntegrationTest {
@@ -165,13 +163,10 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
         streams.start();
 
         final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A",
6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
-        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1,
COUNT_TOPIC, expectedCountKeyValues);
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC,
expectedCountKeyValues);
 
         final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A",
"6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));
-        final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2,
COUNT_STRING_TOPIC, expectedStringCountKeyValues);
-
-        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
-        assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues));
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC,
expectedStringCountKeyValues);
 
         streams.close(5, TimeUnit.SECONDS);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1a78ed3..d9602f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.api.Request;
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -41,7 +38,6 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,12 +46,16 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import kafka.api.Request;
+import kafka.server.KafkaServer;
+import kafka.server.MetadataCache;
+import scala.Option;
 
 /**
  * Utility functions to make integration testing more convenient.
@@ -364,26 +364,25 @@ public class IntegrationTestUtils {
                                                                                     final
long waitTime) throws InterruptedException {
         final List<KeyValue<K, V>> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
-            final TestCondition valuesRead = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final List<KeyValue<K, V>> readData =
-                            readKeyValues(topic, consumer, waitTime, expectedRecords.size());
-                    accumData.addAll(readData);
+            final TestCondition valuesRead = () -> {
+                final List<KeyValue<K, V>> readData =
+                        readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+                accumData.addAll(readData);
 
-                    final Map<K, V> finalData = new HashMap<>();
+                final int accumLastIndex = accumData.size() - 1;
+                final int expectedLastIndex = expectedRecords.size() - 1;
 
-                    for (final KeyValue<K, V> keyValue : accumData) {
-                        finalData.put(keyValue.key, keyValue.value);
-                    }
+                // filter out all intermediate records we don't want
+                final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
 
-                    for (final KeyValue<K, V> keyValue : expectedRecords) {
-                        if (!keyValue.value.equals(finalData.get(keyValue.key)))
-                            return false;
-                    }
+                // need this check as filtering above could have removed the last record
from accumData, but it did not
+                // equal the last expected record
+                final boolean lastRecordsMatch = accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex));
+
+                // returns true only if the remaining records in both lists are the same
and in the same order
+                // and the last record received matches the last expected record
+                return accumulatedActual.equals(expectedRecords) && lastRecordsMatch;
 
-                    return true;
-                }
             };
             final String conditionDetails = "Did not receive all " + expectedRecords + "
records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);


Mime
View raw message