kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.1 updated: MINOR: improve QueryableStateIntegrationTest (#5987)
Date Mon, 03 Dec 2018 06:13:37 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 975aeda  MINOR: improve QueryableStateIntegrationTest (#5987)
975aeda is described below

commit 975aeda906218b95aaa8ab8a1ea130f6980df21b
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Sun Dec 2 22:11:04 2018 -0800

    MINOR: improve QueryableStateIntegrationTest (#5987)
    
    Fix test Comparators plus Java8 cleanup
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../integration/QueryableStateIntegrationTest.java | 321 ++++++++-------------
 1 file changed, 114 insertions(+), 207 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 76eec71..06014a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.KeyValue;
@@ -37,24 +36,21 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
@@ -71,7 +67,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -80,6 +75,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
@@ -128,7 +124,7 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, Long>> stringLongComparator;
     private static int testNo = 0;
 
-    private void createTopics() throws InterruptedException {
+    private void createTopics() throws Exception {
         streamOne = streamOne + "-" + testNo;
         streamConcurrent = streamConcurrent + "-" + testNo;
         streamThree = streamThree + "-" + testNo;
@@ -152,7 +148,9 @@ public class QueryableStateIntegrationTest {
         List<String> input = new ArrayList<>();
         final ClassLoader classLoader = getClass().getClassLoader();
         final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
-        try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile())))
{
+        try (final BufferedReader reader = new BufferedReader(
+            new FileReader(Objects.requireNonNull(classLoader.getResource(fileName)).getFile())))
{
+
             for (String line = reader.readLine(); line != null; line = reader.readLine())
{
                 input.add(line);
             }
@@ -194,23 +192,8 @@ public class QueryableStateIntegrationTest {
         // override this to make the rebalances happen quickly
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-
-        stringComparator = new Comparator<KeyValue<String, String>>() {
-
-            @Override
-            public int compare(final KeyValue<String, String> o1,
-                               final KeyValue<String, String> o2) {
-                return o1.key.compareTo(o2.key);
-            }
-        };
-        stringLongComparator = new Comparator<KeyValue<String, Long>>() {
-
-            @Override
-            public int compare(final KeyValue<String, Long> o1,
-                               final KeyValue<String, Long> o2) {
-                return o1.key.compareTo(o2.key);
-            }
-        };
+        stringComparator = Comparator.comparing((KeyValue<String, String> o) ->
o.key).thenComparing(o -> o.value);
+        stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) ->
o.key).thenComparingLong(o -> o.value);
         inputValues = getInputValues();
         inputValuesKeys = new HashSet<>();
         for (final String sentence : inputValues) {
@@ -221,14 +204,13 @@ public class QueryableStateIntegrationTest {
     }
 
     @After
-    public void shutdown() throws IOException {
+    public void shutdown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close(ofSeconds(30));
         }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
-
     /**
      * Creates a typical word count topology
      */
@@ -243,30 +225,20 @@ public class QueryableStateIntegrationTest {
         final KStream<String, String> textLines = builder.stream(inputTopic, Consumed.with(stringSerde,
stringSerde));
 
         final KGroupedStream<String, String> groupedByWord = textLines
-            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                @Override
-                public Iterable<String> apply(final String value) {
-                    return Arrays.asList(value.split("\\W+"));
-                }
-            })
-            .groupBy(MockMapper.<String, String>selectValueMapper());
+            .flatMapValues((ValueMapper<String, Iterable<String>>) value ->
Arrays.asList(value.split("\\W+")))
+            .groupBy(MockMapper.selectValueMapper());
 
         // Create a State Store for the all time word count
         groupedByWord
-            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName
+ "-" + inputTopic))
+            .count(Materialized.as(storeName + "-" + inputTopic))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         // Create a Windowed State Store that contains the word count for every 1 minute
         groupedByWord
             .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName
+ "-" + inputTopic))
-            .toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
-                @Override
-                public String apply(final Windowed<String> key, final Long value) {
-                    return key.key();
-                }
-            })
+            .count(Materialized.as(windowStoreName + "-" + inputTopic))
+            .toStream((key, value) -> key.key())
             .to(windowOutputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         return new KafkaStreams(builder.build(), streamsConfiguration);
@@ -292,7 +264,6 @@ public class QueryableStateIntegrationTest {
         @Override
         public void run() {
             myStream.start();
-
         }
 
         public void close() {
@@ -315,72 +286,68 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams
streams,
+    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables,
+                                 final KafkaStreams streams,
                                  final KafkaStreamsTest.StateListenerStub stateListenerStub,
-                                 final Set<String> keys, final String storeName) throws
InterruptedException {
+                                 final Set<String> keys,
+                                 final String storeName) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    try {
-                        final StreamsMetadata metadata = streams.metadataForKey(storeName,
key, new StringSerializer());
-
-                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE))
{
-                            return false;
-                        }
-                        final int index = metadata.hostInfo().port();
-                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                        final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore());
-
-                        return store != null && store.get(key) != null;
-                    } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
-                        return false;
-                    } catch (final InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
-                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
+            TestUtils.waitForCondition(() -> {
+                try {
+                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key,
new StringSerializer());
+
+                    if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE))
{
                         return false;
                     }
+                    final int index = metadata.hostInfo().port();
+                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                    final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName,
QueryableStoreTypes.keyValueStore());
 
+                    return store != null && store.get(key) != null;
+                } catch (final IllegalStateException e) {
+                    // Kafka Streams instance may have closed but rebalance hasn't happened
+                    return false;
+                } catch (final InvalidStateStoreException e) {
+                    // there must have been at least one rebalance state
+                    assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
+                    return false;
                 }
             }, 120000, "waiting for metadata, store and value to be non null");
         }
     }
 
-
-    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams
streams,
+    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables,
+                                       final KafkaStreams streams,
                                        final KafkaStreamsTest.StateListenerStub stateListenerStub,
-                                       final Set<String> keys, final String storeName,
-                                       final Long from, final Long to) throws InterruptedException
{
+                                       final Set<String> keys,
+                                       final String storeName,
+                                       final Long from,
+                                       final Long to) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    try {
-                        final StreamsMetadata metadata = streams.metadataForKey(storeName,
key, new StringSerializer());
-                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE))
{
-                            return false;
-                        }
-                        final int index = metadata.hostInfo().port();
-                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                        final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName,
QueryableStoreTypes.<String, Long>windowStore());
-                        return store != null && store.fetch(key, ofEpochMilli(from),
ofEpochMilli(to)) != null;
-                    } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
-                        return false;
-                    } catch (final InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
-                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
+            TestUtils.waitForCondition(() -> {
+                try {
+                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key,
new StringSerializer());
+                    if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE))
{
                         return false;
                     }
-
+                    final int index = metadata.hostInfo().port();
+                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                    final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName,
QueryableStoreTypes.windowStore());
+                    return store != null && store.fetch(key, ofEpochMilli(from),
ofEpochMilli(to)) != null;
+                } catch (final IllegalStateException e) {
+                    // Kafka Streams instance may have closed but rebalance hasn't happened
+                    return false;
+                } catch (final InvalidStateStoreException e) {
+                    // there must have been at least one rebalance state
+                    assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
+                    return false;
                 }
             }, 120000, "waiting for metadata, store and value to be non null");
         }
     }
 
     @Test
-    public void queryOnRebalance() throws InterruptedException {
+    public void queryOnRebalance() throws Exception {
         final int numThreads = STREAM_TWO_PARTITIONS;
         final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
         final Thread[] streamThreads = new Thread[numThreads];
@@ -388,7 +355,6 @@ public class QueryableStateIntegrationTest {
         final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues,
1);
         producerRunnable.run();
 
-
         // create stream threads
         final String storeName = "word-count-store";
         final String windowStoreName = "windowed-word-count-store";
@@ -434,7 +400,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void concurrentAccesses() throws InterruptedException {
+    public void concurrentAccesses() throws Exception {
         final int numIterations = 500000;
         final String storeName = "word-count-store";
         final String windowStoreName = "windowed-word-count-store";
@@ -451,16 +417,15 @@ public class QueryableStateIntegrationTest {
             waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
 
             final ReadOnlyKeyValueStore<String, Long>
-                keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String,
Long>keyValueStore());
+                keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore());
 
             final ReadOnlyWindowStore<String, Long> windowStore =
-                kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String,
Long>windowStore());
-
+                kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore());
 
             final Map<String, Long> expectedWindowState = new HashMap<>();
             final Map<String, Long> expectedCount = new HashMap<>();
             while (producerRunnable.getCurrIteration() < numIterations) {
-                verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]),
expectedWindowState,
+                verifyGreaterOrEqual(inputValuesKeys.toArray(new String[0]), expectedWindowState,
                     expectedCount, windowStore, keyValueStore, true);
             }
         } finally {
@@ -505,15 +470,10 @@ public class QueryableStateIntegrationTest {
                 LongSerializer.class,
                 new Properties()),
             mockTime);
-        final Predicate<String, Long> filterPredicate = new Predicate<String, Long>()
{
-            @Override
-            public boolean test(final String key, final Long value) {
-                return key.contains("kafka");
-            }
-        };
+        final Predicate<String, Long> filterPredicate = (key, value) -> key.contains("kafka");
         final KTable<String, Long> t1 = builder.table(streamOne);
-        final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String,
Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
-        t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryFilterNot"));
+        final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.as("queryFilter"));
+        t1.filterNot(filterPredicate, Materialized.as("queryFilterNot"));
         t2.toStream().to(outputTopic);
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -522,9 +482,9 @@ public class QueryableStateIntegrationTest {
         waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.<String,
Long>keyValueStore());
+            myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.keyValueStore());
         final ReadOnlyKeyValueStore<String, Long>
-            myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String,
Long>keyValueStore());
+            myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.keyValueStore());
 
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
@@ -575,12 +535,7 @@ public class QueryableStateIntegrationTest {
             mockTime);
 
         final KTable<String, String> t1 = builder.table(streamOne);
-        t1.mapValues(new ValueMapper<String, Long>() {
-            @Override
-            public Long apply(final String value) {
-                return Long.valueOf(value);
-            }
-        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+        t1.mapValues((ValueMapper<String, Long>) Long::valueOf, Materialized.<String,
Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
@@ -591,7 +546,7 @@ public class QueryableStateIntegrationTest {
 
         final ReadOnlyKeyValueStore<String, Long>
             myMapStore = kafkaStreams.store("queryMapValues",
-            QueryableStoreTypes.<String, Long>keyValueStore());
+            QueryableStoreTypes.keyValueStore());
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
         }
@@ -623,20 +578,10 @@ public class QueryableStateIntegrationTest {
                 new Properties()),
             mockTime);
 
-        final Predicate<String, String> filterPredicate = new Predicate<String,
String>() {
-            @Override
-            public boolean test(final String key, final String value) {
-                return key.contains("kafka");
-            }
-        };
+        final Predicate<String, String> filterPredicate = (key, value) -> key.contains("kafka");
         final KTable<String, String> t1 = builder.table(streamOne);
-        final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.<String,
String, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
-        final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>()
{
-            @Override
-            public Long apply(final String value) {
-                return Long.valueOf(value);
-            }
-        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
+        final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.as("queryFilter"));
+        final KTable<String, Long> t3 = t2.mapValues((ValueMapper<String, Long>)
Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
         t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -646,7 +591,7 @@ public class QueryableStateIntegrationTest {
 
         final ReadOnlyKeyValueStore<String, Long>
             myMapStore = kafkaStreams.store("queryMapValues",
-            QueryableStoreTypes.<String, Long>keyValueStore());
+            QueryableStoreTypes.keyValueStore());
         for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
             assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
         }
@@ -671,7 +616,6 @@ public class QueryableStateIntegrationTest {
             new KeyValue<>(keys[3], "go"),
             new KeyValue<>(keys[4], "kafka")));
 
-
         final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
         for (final String key : keys) {
             expectedCount.add(new KeyValue<>(key, 1L));
@@ -692,24 +636,24 @@ public class QueryableStateIntegrationTest {
         // Non Windowed
         final String storeName = "my-count";
         s1.groupByKey()
-            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName))
+            .count(Materialized.as(storeName))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         final String windowStoreName = "windowed-count";
         s1.groupByKey()
             .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName));
+            .count(Materialized.as(windowStoreName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
-            myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+            myCount = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
 
         final ReadOnlyWindowStore<String, Long> windowStore =
-            kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
+            kafkaStreams.store(windowStoreName, QueryableStoreTypes.windowStore());
         verifyCanGetByKey(keys,
             expectedCount,
             expectedCount,
@@ -725,7 +669,7 @@ public class QueryableStateIntegrationTest {
         final KStream<String, String> stream = builder.stream(streamThree);
 
         final String storeName = "count-by-key";
-        stream.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as(storeName));
+        stream.groupByKey().count(Materialized.as(storeName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
@@ -743,14 +687,9 @@ public class QueryableStateIntegrationTest {
         final int maxWaitMs = 30000;
         TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
 
-        final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore());
+        final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName,
QueryableStoreTypes.keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return new Long(8).equals(store.get("hello"));
-            }
-        }, maxWaitMs, "wait for count to be 8");
+        TestUtils.waitForCondition(() -> new Long(8).equals(store.get("hello")), maxWaitMs,
"wait for count to be 8");
 
         // close stream
         kafkaStreams.close();
@@ -760,15 +699,12 @@ public class QueryableStateIntegrationTest {
         kafkaStreams.start();
 
         // make sure we never get any value other than 8 for hello
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                try {
-                    assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore()).get("hello"));
-                    return true;
-                } catch (final InvalidStateStoreException ise) {
-                    return false;
-                }
+        TestUtils.waitForCondition(() -> {
+            try {
+                assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore()).get("hello"));
+                return true;
+            } catch (final InvalidStateStoreException ise) {
+                return false;
             }
         }, maxWaitMs, "waiting for store " + storeName);
 
@@ -780,6 +716,7 @@ public class QueryableStateIntegrationTest {
         WaitForStore(final String storeName) {
             this.storeName = storeName;
         }
+
         @Override
         public boolean conditionMet() {
             try {
@@ -801,28 +738,20 @@ public class QueryableStateIntegrationTest {
         final KStream<String, String> input = builder.stream(streamOne);
         input
             .groupByKey()
-            .reduce(new Reducer<String>() {
-                @Override
-                public String apply(final String value1, final String value2) {
-                    if (value1.length() > 1) {
-                        if (beforeFailure.compareAndSet(true, false)) {
-                            throw new RuntimeException("Injected test exception");
-                        }
+            .reduce((value1, value2) -> {
+                if (value1.length() > 1) {
+                    if (beforeFailure.compareAndSet(true, false)) {
+                        throw new RuntimeException("Injected test exception");
                     }
-                    return value1 + value2;
                 }
-            }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName))
+                return value1 + value2;
+            }, Materialized.as(storeName))
             .toStream()
             .to(outputTopic);
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                failed.set(true);
-            }
-        });
+        kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
         kafkaStreams.start();
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
@@ -838,14 +767,12 @@ public class QueryableStateIntegrationTest {
         final int maxWaitMs = 30000;
         TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
 
-        final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, String>keyValueStore());
+        final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName,
QueryableStoreTypes.keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "12".equals(store.get("a")) && "34".equals(store.get("b"));
-            }
-        }, maxWaitMs, "wait for agg to be <a,12> and <b,34>");
+        TestUtils.waitForCondition(
+            () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
+            maxWaitMs,
+            "wait for agg to be <a,12> and <b,34>");
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -857,44 +784,31 @@ public class QueryableStateIntegrationTest {
                 new Properties()),
             mockTime);
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return failed.get();
-            }
-        }, 30000, "wait for thread to fail");
+        TestUtils.waitForCondition(failed::get, 30000, "wait for thread to fail");
 
         TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store
" + storeName);
 
-        final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, String>keyValueStore());
+        final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName,
QueryableStoreTypes.keyValueStore());
 
         try {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return
-                        ("125".equals(store2.get("a"))
-                        || "1225".equals(store2.get("a"))
-                        || "12125".equals(store2.get("a")))
-                        &&
-                        ("34".equals(store2.get("b"))
-                        || "344".equals(store2.get("b"))
-                        || "3434".equals(store2.get("b")));
-                }
-            }, maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125>
and <b,34>||<b,344>||<b,3434>");
+            TestUtils.waitForCondition(() ->
+                ("125".equals(store2.get("a"))
+                || "1225".equals(store2.get("a"))
+                || "12125".equals(store2.get("a")))
+                &&
+                ("34".equals(store2.get("b"))
+                || "344".equals(store2.get("b"))
+                || "3434".equals(store2.get("b"))), maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125>
and <b,34>||<b,344>||<b,3434>");
         } catch (final Throwable t) {
             throw new RuntimeException("Store content is a: " + store2.get("a") + "; b: "
+ store2.get("b"), t);
         }
-
     }
 
     private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyKeyValueStore<String, Long> myCount)
{
         final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
         final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
-        final Set<KeyValue<String, Long>>
-            expectedRangeResults =
-            new TreeSet<>(stringLongComparator);
+        final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator);
 
         expectedRangeResults.addAll(Arrays.asList(
             new KeyValue<>("hello", 1L),
@@ -923,8 +837,7 @@ public class QueryableStateIntegrationTest {
                                    final Set<KeyValue<String, Long>> expectedWindowState,
                                    final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyWindowStore<String, Long> windowStore,
-                                   final ReadOnlyKeyValueStore<String, Long> myCount)
-        throws InterruptedException {
+                                   final ReadOnlyKeyValueStore<String, Long> myCount)
throws Exception {
         final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
         final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
 
@@ -1001,15 +914,13 @@ public class QueryableStateIntegrationTest {
 
     }
 
-    private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs)
throws InterruptedException {
+    private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs)
throws Exception {
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
         config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            StringDeserializer.class.getName());
-        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            LongDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
         IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
             config,
             topic,
@@ -1019,7 +930,6 @@ public class QueryableStateIntegrationTest {
 
     private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String,
Long> store,
                                               final String key) {
-
         final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
@@ -1030,7 +940,6 @@ public class QueryableStateIntegrationTest {
 
     private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long>
store,
                                        final String key) {
-
         final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
@@ -1039,7 +948,6 @@ public class QueryableStateIntegrationTest {
         return Collections.emptyMap();
     }
 
-
     /**
      * A class that periodically produces records in a separate thread
      */
@@ -1077,11 +985,11 @@ public class QueryableStateIntegrationTest {
             producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
             try (final KafkaProducer<String, String> producer =
-                         new KafkaProducer<>(producerConfig, new StringSerializer(),
new StringSerializer())) {
+                     new KafkaProducer<>(producerConfig, new StringSerializer(), new
StringSerializer())) {
 
                 while (getCurrIteration() < numIterations && !shutdown) {
                     for (final String value : inputValues) {
-                        producer.send(new ProducerRecord<String, String>(topic, value));
+                        producer.send(new ProducerRecord<>(topic, value));
                     }
                     incrementIteration();
                 }
@@ -1089,5 +997,4 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-
 }


Mime
View raw message