kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Caching layer should forward record timestamp (#5423)
Date Thu, 26 Jul 2018 17:33:21 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 59fca00  MINOR: Caching layer should forward record timestamp (#5423)
59fca00 is described below

commit 59fca0028cd3b114d719e677a159e60c306faef0
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jul 26 09:31:02 2018 -0700

    MINOR: Caching layer should forward record timestamp (#5423)
    
    Reviewer: Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/CachingSessionStore.java       |   2 +-
 .../state/internals/CachingWindowStore.java        |   2 +-
 .../KStreamAggregationIntegrationTest.java         | 274 +++++++++------------
 .../integration/utils/IntegrationTestUtils.java    | 118 ++++++---
 4 files changed, 202 insertions(+), 194 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 6950693..c307f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -147,7 +147,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
                 context.headers(),
                 true,
                 context.offset(),
-                key.window().end(),
+                context.timestamp(),
                 context.partition(),
                 context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 1f08f51..07120df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -156,7 +156,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
                 context.headers(),
                 true,
                 context.offset(),
-                timestamp,
+                context.timestamp(),
                 context.partition(),
                 context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 10363f8..a29332c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -36,7 +35,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
@@ -49,16 +48,16 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
@@ -136,24 +135,9 @@ public class KStreamAggregationIntegrationTest {
                 mapper,
                 Serialized.with(Serdes.String(), Serdes.String()));
 
-        reducer = new Reducer<String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + ":" + value2;
-            }
-        };
-        initializer = new Initializer<Integer>() {
-            @Override
-            public Integer apply() {
-                return 0;
-            }
-        };
-        aggregator = new Aggregator<String, String, Integer>() {
-            @Override
-            public Integer apply(final String aggKey, final String value, final Integer aggregate)
{
-                return aggregate + value.length();
-            }
-        };
+        reducer = (value1, value2) -> value1 + ":" + value2;
+        initializer = () -> 0;
+        aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
     }
 
     @After
@@ -181,12 +165,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             10);
 
-        Collections.sort(results, new Comparator<KeyValue<String, String>>()
{
-            @Override
-            public int compare(final KeyValue<String, String> o1, final KeyValue<String,
String> o2) {
-                return KStreamAggregationIntegrationTest.compare(o1, o2);
-            }
-        });
+        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
             KeyValue.pair("A", "A:A"),
@@ -218,7 +197,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
         produceMessages(secondBatchTimestamp);
 
-        Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream
                 .windowedBy(TimeWindows.of(500L))
                 .reduce(reducer)
@@ -228,34 +207,28 @@ public class KStreamAggregationIntegrationTest {
         startStreams();
 
         final List<KeyValue<Windowed<String>, String>> windowedOutput =
receiveMessages(
-            new TimeWindowedDeserializer<String>(),
+            new TimeWindowedDeserializer<>(),
             new StringDeserializer(),
             String.class,
             15);
 
         // read from ConsoleConsumer
-        String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
-                new TimeWindowedDeserializer<String>(),
-                new StringDeserializer(),
-                String.class,
-                15);
+        final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+            new TimeWindowedDeserializer<String>(),
+            new StringDeserializer(),
+            String.class,
+            15,
+            false);
 
         final Comparator<KeyValue<Windowed<String>, String>>
             comparator =
-            new Comparator<KeyValue<Windowed<String>, String>>() {
-                @Override
-                public int compare(final KeyValue<Windowed<String>, String> o1,
-                                   final KeyValue<Windowed<String>, String> o2)
{
-                    final int keyComparison = o1.key.key().compareTo(o2.key.key());
-                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
-                }
-            };
+            Comparator.comparing((KeyValue<Windowed<String>, String> o) ->
o.key.key()).thenComparing(o -> o.value);
 
         Collections.sort(windowedOutput, comparator);
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
-        List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
+        final List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
                 new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow,
Long.MAX_VALUE)), "A"),
                 new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow,
Long.MAX_VALUE)), "A"),
                 new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow,
Long.MAX_VALUE)), "A:A"),
@@ -274,13 +247,13 @@ public class KStreamAggregationIntegrationTest {
         );
         assertThat(windowedOutput, is(expectResult));
 
-        Set<String> expectResultString = new HashSet<>(expectResult.size());
-        for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
+        final Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (final KeyValue<Windowed<String>, String> eachRecord: expectResult)
{
             expectResultString.add(eachRecord.toString());
         }
 
         // check every message is contained in the expect result
-        String[] allRecords = resultFromConsoleConsumer.split("\n");
+        final String[] allRecords = resultFromConsoleConsumer.split("\n");
         for (String record: allRecords) {
             record = "KeyValue(" + record + ")";
             assertTrue(expectResultString.contains(record));
@@ -306,12 +279,7 @@ public class KStreamAggregationIntegrationTest {
             new IntegerDeserializer(),
             10);
 
-        Collections.sort(results, new Comparator<KeyValue<String, Integer>>()
{
-            @Override
-            public int compare(final KeyValue<String, Integer> o1, final KeyValue<String,
Integer> o2) {
-                return KStreamAggregationIntegrationTest.compare(o1, o2);
-            }
-        });
+        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
             KeyValue.pair("A", 1),
@@ -336,75 +304,68 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
         produceMessages(secondTimestamp);
 
-        Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream.windowedBy(TimeWindows.of(500L))
                 .aggregate(
                         initializer,
                         aggregator,
-                        Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null,
Serdes.Integer())
+                        Materialized.with(null, Serdes.Integer())
                 )
                 .toStream()
                 .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
         startStreams();
 
-        final List<KeyValue<Windowed<String>, Integer>> windowedMessages
= receiveMessages(
-            new TimeWindowedDeserializer<String>(),
+        final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
windowedMessages = receiveMessagesWithTimestamp(
+            new TimeWindowedDeserializer<>(),
             new IntegerDeserializer(),
             String.class,
             15);
 
         // read from ConsoleConsumer
-        String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
-                new TimeWindowedDeserializer<String>(),
-                new IntegerDeserializer(),
-                String.class,
-                15);
+        final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
+            new TimeWindowedDeserializer<String>(),
+            new IntegerDeserializer(),
+            String.class,
+            15,
+            true);
 
-        final Comparator<KeyValue<Windowed<String>, Integer>>
+        final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
             comparator =
-            new Comparator<KeyValue<Windowed<String>, Integer>>() {
-                @Override
-                public int compare(final KeyValue<Windowed<String>, Integer>
o1,
-                                   final KeyValue<Windowed<String>, Integer>
o2) {
-                    final int keyComparison = o1.key.key().compareTo(o2.key.key());
-                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
-                }
-            };
+            Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer,
Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
 
         Collections.sort(windowedMessages, comparator);
 
         final long firstWindow = firstTimestamp / 500 * 500;
         final long secondWindow = secondTimestamp / 500 * 500;
 
-        List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 2),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 2),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 2),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 2),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 1),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow,
Long.MAX_VALUE)), 2));
+        final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
expectResult = Arrays.asList(
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow,
Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
 
         assertThat(windowedMessages, is(expectResult));
 
-        Set<String> expectResultString = new HashSet<>(expectResult.size());
-        for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
-            expectResultString.add(eachRecord.toString());
+        final Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>>
eachRecord: expectResult) {
+            expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString()
+ ", " + eachRecord.value.key);
         }
 
         // check every message is contained in the expect result
-        String[] allRecords = resultFromConsoleConsumer.split("\n");
-        for (String record: allRecords) {
-            record = "KeyValue(" + record + ")";
+        final String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (final String record: allRecords) {
             assertTrue(expectResultString.contains(record));
         }
 
@@ -419,12 +380,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10);
-        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
-            @Override
-            public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {
-                return KStreamAggregationIntegrationTest.compare(o1, o2);
-            }
-        });
+        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
             KeyValue.pair("A", 1L),
@@ -444,7 +400,7 @@ public class KStreamAggregationIntegrationTest {
     public void shouldCount() throws Exception {
         produceMessages(mockTime.milliseconds());
 
-        groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-by-key"))
+        groupedStream.count(Materialized.as("count-by-key"))
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
@@ -471,12 +427,7 @@ public class KStreamAggregationIntegrationTest {
         stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
                 .windowedBy(TimeWindows.of(500L))
                 .count()
-                .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>()
{
-                    @Override
-                    public String apply(final Windowed<Integer> windowedKey, final
Long value) {
-                        return windowedKey.key() + "@" + windowedKey.window().start();
-                    }
-                }).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+                .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic,
Produced.with(Serdes.String(), Serdes.Long()));
 
         startStreams();
 
@@ -484,12 +435,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10);
-        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
-            @Override
-            public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {
-                return KStreamAggregationIntegrationTest.compare(o1, o2);
-            }
-        });
+        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
 
         final long window = timestamp / 500 * 500;
         assertThat(results, is(Arrays.asList(
@@ -568,7 +514,7 @@ public class KStreamAggregationIntegrationTest {
                         new Properties()),
                 t4);
 
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
@@ -576,23 +522,34 @@ public class KStreamAggregationIntegrationTest {
                 .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
                 .count()
                 .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final Long value)
{
-                        results.put(key, value);
-                        latch.countDown();
-                    }
-                });
+                .transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object,
Object>>() {
+                        private ProcessorContext context;
+
+                        @Override
+                        public void init(final ProcessorContext context) {
+                            this.context = context;
+                        }
+
+                        @Override
+                        public KeyValue<Object, Object> transform(final Windowed<String>
key, final Long value) {
+                            results.put(key, KeyValue.pair(value, context.timestamp()));
+                            latch.countDown();
+                            return null;
+                        }
+
+                        @Override
+                        public void close() {}
+                    });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(2L));
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(1L));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(KeyValue.pair(1L, t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L,
t4)));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(KeyValue.pair(2L, t2)));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L,
t4)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(KeyValue.pair(1L, t3)));
     }
 
     @Test
@@ -662,25 +619,17 @@ public class KStreamAggregationIntegrationTest {
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
-                .reduce(new Reducer<String>() {
-                    @Override
-                    public String apply(final String value1, final String value2) {
-                        return value1 + ":" + value2;
-                    }
-                }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(userSessionsStore))
+                .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
                 .toStream()
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value)
{
-                        results.put(key, value);
-                        latch.countDown();
-                    }
+                .foreach((key, value) -> {
+                    results.put(key, value);
+                    latch.countDown();
                 });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
         final ReadOnlySessionStore<String, String> sessionStore
-                = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.<String, String>sessionStore());
+                = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
 
         // verify correct data received
         assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
@@ -732,16 +681,14 @@ public class KStreamAggregationIntegrationTest {
     }
 
     private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
-                                                        final Deserializer<V> valueDeserializer,
-                                                        final int numMessages)
+                                                                        final Deserializer<V>
valueDeserializer,
+                                                                        final int numMessages)
         throws InterruptedException {
         return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
     }
 
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                                keyDeserializer,
-                                                        final Deserializer<V>
-                                                                valueDeserializer,
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+                                                        final Deserializer<V> valueDeserializer,
                                                         final Class innerClass,
                                                         final int numMessages) throws InterruptedException
{
         final Properties consumerProperties = new Properties();
@@ -761,21 +708,44 @@ public class KStreamAggregationIntegrationTest {
                 60 * 1000);
     }
 
+    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final
Deserializer<K> keyDeserializer,
+                                                                                     final
Deserializer<V> valueDeserializer,
+                                                                                     final
Class innerClass,
+                                                                                     final
int numMessages) throws InterruptedException {
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof
SessionWindowedDeserializer) {
+            consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+                Serdes.serdeFrom(innerClass).getClass().getName());
+        }
+        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            numMessages,
+            60 * 1000);
+    }
+
     private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K>
keyDeserializer,
                                                                       final Deserializer<V>
valueDeserializer,
                                                                       final Class innerClass,
-                                                                      final int numMessages)
{
-        ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
-        PrintStream originalStream = System.out;
-        try (PrintStream newStream = new PrintStream(newConsole)) {
+                                                                      final int numMessages,
+                                                                      final boolean printTimestamp)
{
+        final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
+        final PrintStream originalStream = System.out;
+        try (final PrintStream newStream = new PrintStream(newConsole)) {
             System.setOut(newStream);
 
-            String keySeparator = ", ";
+            final String keySeparator = ", ";
             // manually construct the console consumer argument array
-            String[] args = new String[] {
+            final String[] args = new String[] {
                 "--bootstrap-server", CLUSTER.bootstrapServers(),
                 "--from-beginning",
                 "--property", "print.key=true",
+                "--property", "print.timestamp=" + printTimestamp,
                 "--topic", outputTopic,
                 "--max-messages", String.valueOf(numMessages),
                 "--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 2ab6639..35521c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -164,7 +164,7 @@ public class IntegrationTestUtils {
                                                                          final Long timestamp,
                                                                          final boolean enabledTransactions)
         throws ExecutionException, InterruptedException {
-        try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+        try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig))
{
             if (enabledTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
@@ -282,14 +282,39 @@ public class IntegrationTestUtils {
                                                                                   final long
waitTime) throws InterruptedException {
         final List<KeyValue<K, V>> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
-            final TestCondition valuesRead = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final List<KeyValue<K, V>> readData =
-                        readKeyValues(topic, consumer, waitTime, expectedNumRecords);
-                    accumData.addAll(readData);
-                    return accumData.size() >= expectedNumRecords;
-                }
+            final TestCondition valuesRead = () -> {
+                final List<KeyValue<K, V>> readData =
+                    readKeyValues(topic, consumer, waitTime, expectedNumRecords);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
+            };
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
+    /**
+     * Wait until enough data (key-value records) has been consumed.
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime           Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws AssertionError       if the given wait time elapses
+     */
+    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final
Properties consumerConfig,
+                                                                                        
                      final String topic,
+                                                                                        
                      final int expectedNumRecords,
+                                                                                        
                      final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = () -> {
+                final List<KeyValue<K, KeyValue<V, Long>>> readData =
+                    readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
             };
             final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -303,14 +328,11 @@ public class IntegrationTestUtils {
                                                                                 final long
waitTime) throws InterruptedException {
         final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
-            final TestCondition valuesRead = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final List<ConsumerRecord<K, V>> readData =
-                        readRecords(topic, consumer, waitTime, expectedNumRecords);
-                    accumData.addAll(readData);
-                    return accumData.size() >= expectedNumRecords;
-                }
+            final TestCondition valuesRead = () -> {
+                final List<ConsumerRecord<K, V>> readData =
+                    readRecords(topic, consumer, waitTime, expectedNumRecords);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
             };
             final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -341,14 +363,11 @@ public class IntegrationTestUtils {
                                                                 final long waitTime) throws
InterruptedException {
         final List<V> accumData = new ArrayList<>();
         try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
-            final TestCondition valuesRead = new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    final List<V> readData =
-                        readValues(topic, consumer, waitTime, expectedNumRecords);
-                    accumData.addAll(readData);
-                    return accumData.size() >= expectedNumRecords;
-                }
+            final TestCondition valuesRead = () -> {
+                final List<V> readData =
+                    readValues(topic, consumer, waitTime, expectedNumRecords);
+                accumData.addAll(readData);
+                return accumData.size() >= expectedNumRecords;
             };
             final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
             TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -373,23 +392,20 @@ public class IntegrationTestUtils {
                                                      final String topic,
                                                      final int partition,
                                                      final long timeout) throws InterruptedException
{
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                for (final KafkaServer server : servers) {
-                    final MetadataCache metadataCache = server.apis().metadataCache();
-                    final Option<UpdateMetadataRequest.PartitionState> partitionInfo
=
-                            metadataCache.getPartitionInfo(topic, partition);
-                    if (partitionInfo.isEmpty()) {
-                        return false;
-                    }
-                    final UpdateMetadataRequest.PartitionState metadataPartitionState = partitionInfo.get();
-                    if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader))
{
-                        return false;
-                    }
+        TestUtils.waitForCondition(() -> {
+            for (final KafkaServer server : servers) {
+                final MetadataCache metadataCache = server.apis().metadataCache();
+                final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
+                        metadataCache.getPartitionInfo(topic, partition);
+                if (partitionInfo.isEmpty()) {
+                    return false;
+                }
+                final UpdateMetadataRequest.PartitionState metadataPartitionState = partitionInfo.get();
+                if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader))
{
+                    return false;
                 }
-                return true;
             }
+            return true;
         }, timeout, "metadata for topic=" + topic + " partition=" + partition + " not propagated
to all brokers");
 
     }
@@ -474,6 +490,28 @@ public class IntegrationTestUtils {
         return consumedValues;
     }
 
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to
read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final
String topic,
+                                                                                        
 final Consumer<K, V> consumer,
+                                                                                        
 final long waitTime,
+                                                                                        
 final int maxMessages) {
+        final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new
ArrayList<>();
+        final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer,
waitTime, maxMessages);
+        for (final ConsumerRecord<K, V> record : records) {
+            consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(),
record.timestamp())));
+        }
+        return consumedValues;
+    }
+
     private static <K, V> List<ConsumerRecord<K, V>> readRecords(final
String topic,
                                                                  final Consumer<K, V>
consumer,
                                                                  final long waitTime,


Mime
View raw message