kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4263: Fix flaky test QueryableStateIntegrationTest.concurrentAccess
Date Wed, 20 Dec 2017 23:22:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 760d86a97 -> 9cacb92d1


KAFKA-4263: Fix flaky test QueryableStateIntegrationTest.concurrentAccess

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4342 from mjsax/kafka-4263-concurrentAccess


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cacb92d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cacb92d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cacb92d

Branch: refs/heads/trunk
Commit: 9cacb92d13468803d219f8331c6e4402444abc1a
Parents: 760d86a
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Dec 20 15:22:06 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 20 15:22:06 2017 -0800

----------------------------------------------------------------------
 .../QueryableStateIntegrationTest.java          | 166 ++++++++++++-------
 .../integration/utils/IntegrationTestUtils.java |  11 +-
 2 files changed, 109 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9cacb92d/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index e9df495..51c5ce2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -40,17 +40,21 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockKeyValueMapper;
@@ -99,6 +103,7 @@ public class QueryableStateIntegrationTest {
     private String streamConcurrent = "stream-concurrent";
     private String outputTopic = "output";
     private String outputTopicConcurrent = "output-concurrent";
+    private String outputTopicConcurrentWindowed = "output-concurrent-windowed";
     private String outputTopicThree = "output-three";
     // sufficiently large window size such that everything falls into 1 window
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
@@ -106,6 +111,7 @@ public class QueryableStateIntegrationTest {
     private static final int NUM_REPLICAS = NUM_BROKERS;
     private Properties streamsConfiguration;
     private List<String> inputValues;
+    private int numberOfWordsPerIteration = 0;
     private Set<String> inputValuesKeys;
     private KafkaStreams kafkaStreams;
     private Comparator<KeyValue<String, String>> stringComparator;
@@ -118,12 +124,13 @@ public class QueryableStateIntegrationTest {
         streamThree = streamThree + "-" + testNo;
         outputTopic = outputTopic + "-" + testNo;
         outputTopicConcurrent = outputTopicConcurrent + "-" + testNo;
+        outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + testNo;
         outputTopicThree = outputTopicThree + "-" + testNo;
         streamTwo = streamTwo + "-" + testNo;
         CLUSTER.createTopics(streamOne, streamConcurrent);
         CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
-        CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree);
+        CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed,
outputTopicThree);
     }
 
     @Before
@@ -176,7 +183,9 @@ public class QueryableStateIntegrationTest {
             "king of the world");
         inputValuesKeys = new HashSet<>();
         for (final String sentence : inputValues) {
-            Collections.addAll(inputValuesKeys, sentence.split("\\W+"));
+            final String[] words = sentence.split("\\W+");
+            numberOfWordsPerIteration += words.length;
+            Collections.addAll(inputValuesKeys, words);
         }
     }
 
@@ -192,7 +201,12 @@ public class QueryableStateIntegrationTest {
     /**
      * Creates a typical word count topology
      */
-    private KafkaStreams createCountStream(final String inputTopic, final String outputTopic,
final Properties streamsConfiguration) {
+    private KafkaStreams createCountStream(final String inputTopic,
+                                           final String outputTopic,
+                                           final String windowOutputTopic,
+                                           final String storeName,
+                                           final String windowStoreName,
+                                           final Properties streamsConfiguration) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final KStream<String, String> textLines = builder.stream(inputTopic, Consumed.with(stringSerde,
stringSerde));
@@ -207,10 +221,22 @@ public class QueryableStateIntegrationTest {
             .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
 
         // Create a State Store for the all time word count
-        groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(),
outputTopic);
+        groupedByWord
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName
+ "-" + inputTopic))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         // Create a Windowed State Store that contains the word count for every 1 minute
-        groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
+        groupedByWord
+            .windowedBy(TimeWindows.of(WINDOW_SIZE))
+            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName
+ "-" + inputTopic))
+            .toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
+                @Override
+                public String apply(final Windowed<String> key, final Long value) {
+                    return key.key();
+                }
+            })
+            .to(windowOutputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         return new KafkaStreams(builder.build(), streamsConfiguration);
     }
@@ -220,10 +246,15 @@ public class QueryableStateIntegrationTest {
         private boolean closed = false;
         private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
 
-        StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort)
{
+        StreamRunnable(final String inputTopic,
+                       final String outputTopic,
+                       final String outputTopicWindowed,
+                       final String storeName,
+                       final String windowStoreName,
+                       final int queryPort) {
             final Properties props = (Properties) streamsConfiguration.clone();
             props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
-            myStream = createCountStream(inputTopic, outputTopic, props);
+            myStream = createCountStream(inputTopic, outputTopic, outputTopicWindowed, storeName,
windowStoreName, props);
             myStream.setStateListener(stateListener);
         }
 
@@ -329,8 +360,10 @@ public class QueryableStateIntegrationTest {
 
 
         // create stream threads
+        final String storeName = "word-count-store";
+        final String windowStoreName = "windowed-word-count-store";
         for (int i = 0; i < numThreads; i++) {
-            streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
+            streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, outputTopicConcurrentWindowed,
storeName, windowStoreName, i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
@@ -340,9 +373,9 @@ public class QueryableStateIntegrationTest {
 
             for (int i = 0; i < numThreads; i++) {
                 verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
-                    "word-count-store-" + streamThree);
+                    storeName + "-" + streamThree);
                 verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
-                                      "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
+                    windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
                 assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
             }
 
@@ -355,9 +388,9 @@ public class QueryableStateIntegrationTest {
 
             // query from the remaining thread
             verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
-                "word-count-store-" + streamThree);
+                storeName + "-" + streamThree);
             verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
-                                  "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
+                windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
             assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
         } finally {
             for (int i = 0; i < numThreads; i++) {
@@ -372,35 +405,34 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void concurrentAccesses() throws InterruptedException {
-
         final int numIterations = 500000;
+        final String storeName = "word-count-store";
+        final String windowStoreName = "windowed-word-count-store";
 
         final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent,
inputValues, numIterations);
         final Thread producerThread = new Thread(producerRunnable);
-        kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, streamsConfiguration);
+        kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, outputTopicConcurrentWindowed,
storeName, windowStoreName, streamsConfiguration);
 
         kafkaStreams.start();
         producerThread.start();
 
         try {
-            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
+            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
+            waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
 
             final ReadOnlyKeyValueStore<String, Long>
-                keyValueStore = kafkaStreams.store("word-count-store-" + streamConcurrent,
QueryableStoreTypes.<String, Long>keyValueStore());
+                keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String,
Long>keyValueStore());
 
             final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store("windowed-word-count-store-" + streamConcurrent, QueryableStoreTypes.<String,
Long>windowStore());
+                kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String,
Long>windowStore());
 
 
             final Map<String, Long> expectedWindowState = new HashMap<>();
             final Map<String, Long> expectedCount = new HashMap<>();
             while (producerRunnable.getCurrIteration() < numIterations) {
                 verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
expectedWindowState,
-                    expectedCount, windowStore, keyValueStore, false);
+                    expectedCount, windowStore, keyValueStore, true);
             }
-            // finally check if all keys are there
-            verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
expectedWindowState,
-                expectedCount, windowStore, keyValueStore, true);
         } finally {
             producerRunnable.shutdown();
             producerThread.interrupt();
@@ -424,16 +456,15 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, Long>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], 1L),
-            new KeyValue<>(keys[1], 1L),
-            new KeyValue<>(keys[2], 3L),
-            new KeyValue<>(keys[3], 5L),
-            new KeyValue<>(keys[4], 2L)));
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Collections.singleton(
-            new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], 1L),
+                new KeyValue<>(keys[1], 1L),
+                new KeyValue<>(keys[2], 3L),
+                new KeyValue<>(keys[3], 5L),
+                new KeyValue<>(keys[4], 2L))
+        );
+        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>(Collections.singleton(new
KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -453,7 +484,7 @@ public class QueryableStateIntegrationTest {
         final KTable<String, Long> t1 = builder.table(streamOne);
         final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String,
Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
         t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryFilterNot"));
-        t2.to(outputTopic);
+        t2.toStream().to(outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -490,13 +521,14 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], "1"),
-            new KeyValue<>(keys[1], "1"),
-            new KeyValue<>(keys[2], "3"),
-            new KeyValue<>(keys[3], "5"),
-            new KeyValue<>(keys[4], "2")));
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -515,7 +547,7 @@ public class QueryableStateIntegrationTest {
                 return Long.valueOf(value);
             }
         }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
-        t2.to(Serdes.String(), Serdes.Long(), outputTopic);
+        t2.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -536,16 +568,15 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
-        final Set<KeyValue<String, String>> batch1 = new HashSet<>();
-        batch1.addAll(Arrays.asList(
-            new KeyValue<>(keys[0], "1"),
-            new KeyValue<>(keys[1], "1"),
-            new KeyValue<>(keys[2], "3"),
-            new KeyValue<>(keys[3], "5"),
-            new KeyValue<>(keys[4], "2")));
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>();
-        expectedBatch1.addAll(Collections.singleton(
-            new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, String>> batch1 = new HashSet<>(
+            Arrays.asList(
+                new KeyValue<>(keys[0], "1"),
+                new KeyValue<>(keys[1], "1"),
+                new KeyValue<>(keys[2], "3"),
+                new KeyValue<>(keys[3], "5"),
+                new KeyValue<>(keys[4], "2"))
+        );
+        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>(Collections.singleton(new
KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -571,7 +602,7 @@ public class QueryableStateIntegrationTest {
                 return Long.valueOf(value);
             }
         }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
-        t3.to(Serdes.String(), Serdes.Long(), outputTopic);
+        t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
@@ -624,19 +655,26 @@ public class QueryableStateIntegrationTest {
         final KStream<String, String> s1 = builder.stream(streamOne);
 
         // Non Windowed
-        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), outputTopic);
-
-        s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
+        final String storeName = "my-count";
+        s1.groupByKey()
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName))
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+        final String windowStoreName = "windowed-count";
+        s1.groupByKey()
+            .windowedBy(TimeWindows.of(WINDOW_SIZE))
+            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
+            myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
 
         final ReadOnlyWindowStore<String, Long> windowStore =
-            kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
+            kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
         verifyCanGetByKey(keys,
             expectedCount,
             expectedCount,
@@ -652,7 +690,7 @@ public class QueryableStateIntegrationTest {
         final KStream<String, String> stream = builder.stream(streamThree);
 
         final String storeName = "count-by-key";
-        stream.groupByKey().count(storeName);
+        stream.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as(storeName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
@@ -738,7 +776,8 @@ public class QueryableStateIntegrationTest {
                     }
                     return value1 + value2;
                 }
-            }, storeName)
+            }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName))
+            .toStream()
             .to(outputTopic);
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
@@ -889,22 +928,21 @@ public class QueryableStateIntegrationTest {
                                       final Map<String, Long> expectedCount,
                                       final ReadOnlyWindowStore<String, Long> windowStore,
                                       final ReadOnlyKeyValueStore<String, Long> keyValueStore,
-                                      final boolean failIfKeyNotFound)
-        throws InterruptedException {
+                                      final boolean failIfKeyNotFound) {
         final Map<String, Long> windowState = new HashMap<>();
         final Map<String, Long> countState = new HashMap<>();
 
         for (final String key : keys) {
             final Map<String, Long> map = fetchMap(windowStore, key);
             if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound)
{
-                fail("Key not found " + key);
+                fail("Key in windowed-store not found " + key);
             }
             windowState.putAll(map);
             final Long value = keyValueStore.get(key);
             if (value != null) {
                 countState.put(key, value);
             } else if (failIfKeyNotFound) {
-                fail("Key not found " + key);
+                fail("Key in key-value-store not found " + key);
             }
         }
 
@@ -983,7 +1021,7 @@ public class QueryableStateIntegrationTest {
             this.numIterations = numIterations;
         }
 
-        private synchronized void incrementInteration() {
+        private synchronized void incrementIteration() {
             currIteration++;
         }
 
@@ -1011,7 +1049,7 @@ public class QueryableStateIntegrationTest {
                     for (final String value : inputValues) {
                         producer.send(new ProducerRecord<String, String>(topic, value));
                     }
-                    incrementInteration();
+                    incrementIteration();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9cacb92d/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index f50417d..e8cd59e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -173,7 +173,6 @@ public class IntegrationTestUtils {
      * @param expectedNumRecords Minimum number of expected records
      * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
-     * @throws InterruptedException
      * @throws AssertionError       if the given wait time elapses
      */
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final
Properties consumerConfig,
@@ -212,7 +211,6 @@ public class IntegrationTestUtils {
      * @param expectedNumRecords Minimum number of expected records
      * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
-     * @throws InterruptedException
      * @throws AssertionError       if the given wait time elapses
      */
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties
consumerConfig,
@@ -320,7 +318,10 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer.
      * @return The values retrieved via the consumer.
      */
-    private static <V> List<V> readValues(final String topic, final Consumer<Object,
V> consumer, final long waitTime, final int maxMessages) {
+    private static <V> List<V> readValues(final String topic,
+                                          final Consumer<Object, V> consumer,
+                                          final long waitTime,
+                                          final int maxMessages) {
         final List<V> returnList = new ArrayList<>();
         final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumer,
waitTime, maxMessages);
         for (final KeyValue<?, V> kv : kvs) {
@@ -340,7 +341,9 @@ public class IntegrationTestUtils {
      * @return The KeyValue elements retrieved via the consumer
      */
     private static <K, V> List<KeyValue<K, V>> readKeyValues(final String
topic,
-        final Consumer<K, V> consumer, final long waitTime, final int maxMessages)
{
+                                                             final Consumer<K, V> consumer,
+                                                             final long waitTime,
+                                                             final int maxMessages) {
         final List<KeyValue<K, V>> consumedValues;
         consumer.subscribe(Collections.singletonList(topic));
         final int pollIntervalMs = 100;


Mime
View raw message