kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
Date Sun, 12 May 2019 22:32:09 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a237f5  KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
8a237f5 is described below

commit 8a237f599afa539868a138b5a2534dbf884cb4ec
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Mon May 13 00:31:44 2019 +0200

    KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
    
    For session-windows, the result record should have the window-end timestamp as record timestamp.
    
    Rebased to resolve merge conflicts. Removed unused classes TupleForwarder and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, SessionTupleForwarder, TimestampedCacheFlushListerner, and SessionCacheFlushListener)
    
    Reviewers: John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/KStreamSessionWindowAggregate.java   |  10 +-
 ...istener.java => SessionCacheFlushListener.java} |   9 +-
 ...leForwarder.java => SessionTupleForwarder.java} |  21 +++--
 .../processor/internals/ProcessorContextImpl.java  |  11 ++-
 .../internals/ProcessorRecordContext.java          |   6 +-
 .../state/internals/AbstractStoreBuilder.java      |   4 +-
 .../state/internals/CachingSessionStore.java       |   1 -
 .../state/internals/SessionStoreBuilder.java       |  19 ++--
 .../KStreamAggregationIntegrationTest.java         | 101 ++++++++++++++-------
 .../kstream/internals/KGroupedStreamImplTest.java  |  95 ++++++++++++-------
 ...KStreamSessionWindowAggregateProcessorTest.java |  88 ++++++++++++++----
 .../internals/SessionCacheFlushListenerTest.java   |  52 +++++++++++
 ...derTest.java => SessionTupleForwarderTest.java} |  34 ++++---
 .../internals/SessionWindowedKStreamImplTest.java  |  93 ++++++++++++++-----
 .../kstream/internals/SuppressScenarioTest.java    |  37 +++++---
 .../state/internals/CachingSessionStoreTest.java   |  91 ++++++++++++++-----
 .../state/internals/SessionStoreBuilderTest.java   |  50 ++++++----
 .../java/org/apache/kafka/test/MockProcessor.java  |   9 ++
 .../kafka/streams/scala/kstream/KTableTest.scala   |  20 +++-
 19 files changed, 531 insertions(+), 220 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 3168393..68dc4a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -78,7 +78,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
     private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> {
 
         private SessionStore<K, Agg> store;
-        private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private SessionTupleForwarder<K, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
@@ -93,7 +93,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
 
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
+            tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
         }
 
         @Override
@@ -109,10 +109,10 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 return;
             }
 
-            observedStreamTime = Math.max(observedStreamTime, context().timestamp());
+            final long timestamp = context().timestamp();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
             final long closeTime = observedStreamTime - windows.gracePeriodMs();
 
-            final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
@@ -148,7 +148,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                     context().topic(),
                     context().partition(),
                     context().offset(),
-                    context().timestamp(),
+                    timestamp,
                     mergedWindow.start(),
                     mergedWindow.end(),
                     closeTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
similarity index 85%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 6c6b8fd..f40fdfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -16,30 +16,31 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
+class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
     private final InternalProcessorContext context;
     private final ProcessorNode myNode;
 
-    ForwardingCacheFlushListener(final ProcessorContext context) {
+    SessionCacheFlushListener(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
     }
 
     @Override
-    public void apply(final K key,
+    public void apply(final Windowed<K> key,
                       final V newValue,
                       final V oldValue,
                       final long timestamp) {
         final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
+            context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
similarity index 75%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
index 94b0ebd..bad255a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 /**
@@ -25,29 +28,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
  * Forwarding by this class only occurs when caching is not enabled. If caching is enabled,
  * forwarding occurs in the flush listener when the cached store flushes.
  *
- * @param <K> the type of the key
- * @param <V> the type of the value
+ * @param <K>
+ * @param <V>
  */
-class TupleForwarder<K, V> {
+class SessionTupleForwarder<K, V> {
     private final ProcessorContext context;
     private final boolean sendOldValues;
     private final boolean cachingEnabled;
 
     @SuppressWarnings("unchecked")
-    TupleForwarder(final StateStore store,
-                   final ProcessorContext context,
-                   final ForwardingCacheFlushListener<K, V> flushListener,
-                   final boolean sendOldValues) {
+    SessionTupleForwarder(final StateStore store,
+                          final ProcessorContext context,
+                          final CacheFlushListener<Windowed<K>, V> flushListener,
+                          final boolean sendOldValues) {
         this.context = context;
         this.sendOldValues = sendOldValues;
         cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
     }
 
-    public void maybeForward(final K key,
+    public void maybeForward(final Windowed<K> key,
                              final V newValue,
                              final V oldValue) {
         if (!cachingEnabled) {
-            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null));
+            context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(key.window().end()));
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 1df6610..cd2b179 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -160,12 +160,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                final V value,
                                final To to) {
         final ProcessorNode previousNode = currentNode();
-        final long currentTimestamp = recordContext.timestamp();
+        final ProcessorRecordContext previousContext = recordContext;
 
         try {
             toInternal.update(to);
             if (toInternal.hasTimestamp()) {
-                recordContext.setTimestamp(toInternal.timestamp());
+                recordContext = new ProcessorRecordContext(
+                    toInternal.timestamp(),
+                    recordContext.offset(),
+                    recordContext.partition(),
+                    recordContext.topic(),
+                    recordContext.headers());
             }
 
             final String sendTo = toInternal.child();
@@ -183,7 +188,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                 forward(child, key, value);
             }
         } finally {
-            recordContext.setTimestamp(currentTimestamp);
+            recordContext = previousContext;
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index cc512ae..1b22482 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -29,7 +29,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    private long timestamp;
+    private final long timestamp;
     private final long offset;
     private final String topic;
     private final int partition;
@@ -48,10 +48,6 @@ public class ProcessorRecordContext implements RecordContext {
         this.headers = headers;
     }
 
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
     @Override
     public long offset() {
         return offset;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index 898db9e..fd8267b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -38,8 +38,8 @@ abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implement
                                 final Serde<K> keySerde,
                                 final Serde<V> valueSerde,
                                 final Time time) {
-        Objects.requireNonNull(name, "name can't be null");
-        Objects.requireNonNull(time, "time can't be null");
+        Objects.requireNonNull(name, "name cannot be null");
+        Objects.requireNonNull(time, "time cannot be null");
         this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index adbaf4c..d48f540 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-
 import java.util.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
index a40eab3..51ef319 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 
+import java.util.Objects;
+
 
 public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> {
 
@@ -31,26 +33,25 @@ public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, Sessio
                                final Serde<K> keySerde,
                                final Serde<V> valueSerde,
                                final Time time) {
-        super(storeSupplier.name(), keySerde, valueSerde, time);
+        super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time);
         this.storeSupplier = storeSupplier;
     }
 
     @Override
     public SessionStore<K, V> build() {
-        return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
-                                         storeSupplier.metricsScope(),
-                                         keySerde,
-                                         valueSerde,
-                                         time);
+        return new MeteredSessionStore<>(
+            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            storeSupplier.metricsScope(),
+            keySerde,
+            valueSerde,
+            time);
     }
 
     private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) {
         if (!enableCaching) {
             return inner;
         }
-        return new CachingSessionStore(
-            inner,
-            storeSupplier.segmentIntervalMs());
+        return new CachingSessionStore(inner, storeSupplier.segmentIntervalMs());
     }
 
     private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a13408e..ea3695e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -57,7 +56,6 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.test.IntegrationTest;
@@ -154,7 +152,7 @@ public class KStreamAggregationIntegrationTest {
     public void shouldReduce() throws Exception {
         produceMessages(mockTime.milliseconds());
         groupedStream
-            .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce-by-key"))
+            .reduce(reducer, Materialized.as("reduce-by-key"))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
@@ -167,7 +165,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             10);
 
-        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+        results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
             KeyValue.pair("A", "A:A"),
@@ -226,7 +224,7 @@ public class KStreamAggregationIntegrationTest {
             comparator =
             Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
 
-        Collections.sort(windowedOutput, comparator);
+        windowedOutput.sort(comparator);
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
@@ -267,7 +265,7 @@ public class KStreamAggregationIntegrationTest {
         groupedStream.aggregate(
             initializer,
             aggregator,
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregate-by-selected-key"))
+            Materialized.as("aggregate-by-selected-key"))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
 
@@ -280,7 +278,7 @@ public class KStreamAggregationIntegrationTest {
             new IntegerDeserializer(),
             10);
 
-        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+        results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
             KeyValue.pair("A", 1),
@@ -335,7 +333,7 @@ public class KStreamAggregationIntegrationTest {
             comparator =
             Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
 
-        Collections.sort(windowedMessages, comparator);
+        windowedMessages.sort(comparator);
 
         final long firstWindow = firstTimestamp / 500 * 500;
         final long secondWindow = secondTimestamp / 500 * 500;
@@ -381,7 +379,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10);
-        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+        results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
             KeyValue.pair("A", 1L),
@@ -436,7 +434,7 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10);
-        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
+        results.sort(KStreamAggregationIntegrationTest::compare);
 
         final long window = timestamp / 500 * 500;
         assertThat(results, is(Arrays.asList(
@@ -457,13 +455,12 @@ public class KStreamAggregationIntegrationTest {
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
-
-        final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
                                                                         new KeyValue<>("penny", "start"),
                                                                         new KeyValue<>("jo", "pause"),
                                                                         new KeyValue<>("emily", "pause"));
 
+        final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
                 t1Messages,
@@ -473,7 +470,6 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t1);
-
         final long t2 = t1 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
@@ -499,7 +495,6 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t3);
-
         final long t4 = t3 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
@@ -513,9 +508,21 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t4);
+        final long t5 = t4 - 1;
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            userSessionsStream,
+            Collections.singletonList(
+                new KeyValue<>("jo", "late")   // jo has late arrival
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t5);
 
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(11);
+        final CountDownLatch latch = new CountDownLatch(13);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
@@ -543,10 +550,11 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
+
         assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
         assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L, t4)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair(2L, t4)));
         assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair(2L, t2)));
         assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L, t4)));
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
@@ -555,13 +563,12 @@ public class KStreamAggregationIntegrationTest {
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
-
-        final long t1 = mockTime.milliseconds();
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
                                                                         new KeyValue<>("penny", "start"),
                                                                         new KeyValue<>("jo", "pause"),
                                                                         new KeyValue<>("emily", "pause"));
 
+        final long t1 = mockTime.milliseconds();
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
                 t1Messages,
@@ -571,7 +578,6 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t1);
-
         final long t2 = t1 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
@@ -597,7 +603,6 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t3);
-
         final long t4 = t3 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
                 userSessionsStream,
@@ -611,40 +616,65 @@ public class KStreamAggregationIntegrationTest {
                         StringSerializer.class,
                         new Properties()),
                 t4);
+        final long t5 = t4 - 1;
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            userSessionsStream,
+            Collections.singletonList(
+                new KeyValue<>("jo", "late")   // jo has late arrival
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t5);
 
-        final Map<Windowed<String>, String> results = new HashMap<>();
-        final CountDownLatch latch = new CountDownLatch(11);
+        final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
                 .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
                 .toStream()
-                .foreach((key, value) -> {
-                    results.put(key, value);
+            .transform(() -> new Transformer<Windowed<String>, String, KeyValue<Object, Object>>() {
+                private ProcessorContext context;
+
+                @Override
+                public void init(final ProcessorContext context) {
+                    this.context = context;
+                }
+
+                @Override
+                public KeyValue<Object, Object> transform(final Windowed<String> key, final String value) {
+                    results.put(key, KeyValue.pair(value, context.timestamp()));
                     latch.countDown();
-                });
+                    return null;
+                }
+
+                @Override
+                public void close() {}
+            });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        final ReadOnlySessionStore<String, String> sessionStore =
-            kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
 
         // verify correct data received
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start"));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause"));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume"));
-        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume"));
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume"));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop"));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("pause", t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair("resume:late", t4)));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair("pause:resume", t2)));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3)));
 
         // verify can query data via IQ
+        final ReadOnlySessionStore<String, String> sessionStore =
+            kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
         final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
         assertFalse(bob.hasNext());
-
     }
 
     @Test
@@ -727,6 +757,7 @@ public class KStreamAggregationIntegrationTest {
                });
         startStreams();
         assertTrue(latch.await(30, TimeUnit.SECONDS));
+
         assertThat(results.get(new Windowed<>("bob", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(2L, t4)));
         assertThat(results.get(new Windowed<>("penny", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t3)));
         assertThat(results.get(new Windowed<>("jo", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t4)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index acdbb39..97d1566 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
@@ -153,23 +155,31 @@ public class KGroupedStreamImplTest {
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
 
-    private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
+    private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
             driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
         }
-        assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
-        assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
-        assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+        final Map<Windowed<String>, ValueAndTimestamp<Integer>> result
+            = supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+        assertEquals(
+            ValueAndTimestamp.make(2, 30L),
+            result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+        assertEquals(
+            ValueAndTimestamp.make(1, 15L),
+            result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+        assertEquals(
+            ValueAndTimestamp.make(3, 100L),
+            result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
     }
 
     @Test
     public void shouldAggregateSessionWindows() {
-        final Map<Windowed<String>, Integer> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, Integer> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .aggregate(
@@ -179,15 +189,15 @@ public class KGroupedStreamImplTest {
                 Materialized
                     .<String, Integer, SessionStore<Bytes, byte[]>>as("session-store").
                     withValueSerde(Serdes.Integer()));
-        table.toStream().foreach(results::put);
+        table.toStream().process(supplier);
 
-        doAggregateSessionWindows(results);
+        doAggregateSessionWindows(supplier);
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
     @Test
     public void shouldAggregateSessionWindowsWithInternalStoreName() {
-        final Map<Windowed<String>, Integer> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, Integer> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .aggregate(
@@ -195,80 +205,97 @@ public class KGroupedStreamImplTest {
                 (aggKey, value, aggregate) -> aggregate + 1,
                 (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
                 Materialized.with(null, Serdes.Integer()));
-        table.toStream().foreach(results::put);
+        table.toStream().process(supplier);
 
-        doAggregateSessionWindows(results);
+        doAggregateSessionWindows(supplier);
     }
 
-    private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
+    private void doCountSessionWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) {
+
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
             driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70));
-            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90));
         }
-        assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
-        assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
-        assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+        final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+        assertEquals(
+            ValueAndTimestamp.make(2L, 30L),
+            result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+        assertEquals(
+            ValueAndTimestamp.make(1L, 15L),
+            result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+        assertEquals(
+            ValueAndTimestamp.make(3L, 100L),
+            result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
     }
 
     @Test
     public void shouldCountSessionWindows() {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, Long> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .count(Materialized.as("session-store"));
-        table.toStream().foreach(results::put);
-        doCountSessionWindows(results);
+        table.toStream().process(supplier);
+        doCountSessionWindows(supplier);
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
     @Test
     public void shouldCountSessionWindowsWithInternalStoreName() {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, Long> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .count();
-        table.toStream().foreach(results::put);
-        doCountSessionWindows(results);
+        table.toStream().process(supplier);
+        doCountSessionWindows(supplier);
         assertNull(table.queryableStoreName());
     }
 
-    private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
+    private void doReduceSessionWindows(final MockProcessorSupplier<Windowed<String>, String> supplier) {
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10));
             driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30));
             driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70));
-            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90));
-            driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 100));
+            driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 90));
         }
-        assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
-        assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
-        assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
+        final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+        assertEquals(
+            ValueAndTimestamp.make("A:B", 30L),
+            result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
+        assertEquals(
+            ValueAndTimestamp.make("Z", 15L),
+            result.get(new Windowed<>("2", new SessionWindow(15L, 15L))));
+        assertEquals(
+            ValueAndTimestamp.make("A:B:C", 100L),
+            result.get(new Windowed<>("1", new SessionWindow(70L, 100L))));
     }
 
     @Test
     public void shouldReduceSessionWindows() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, String> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store"));
-        table.toStream().foreach(results::put);
-        doReduceSessionWindows(results);
+        table.toStream().process(supplier);
+        doReduceSessionWindows(supplier);
         assertEquals(table.queryableStoreName(), "session-store");
     }
 
     @Test
     public void shouldReduceSessionWindowsWithInternalStoreName() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         final KTable<Windowed<String>, String> table = groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
             .reduce((value1, value2) -> value1 + ":" + value2);
-        table.toStream().foreach(results::put);
-        doReduceSessionWindows(results);
+        table.toStream().process(supplier);
+        doReduceSessionWindows(supplier);
         assertNull(table.queryableStoreName());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index f53d9f3..2ea7700 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -67,6 +69,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private static final long GAP_MS = 5 * 60 * 1000L;
     private static final String STORE_NAME = "session-store";
 
+    private final ToInternal toInternal = new ToInternal();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1;
     private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
@@ -78,7 +81,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             aggregator,
             sessionMerger);
 
-    private final List<KeyValue> results = new ArrayList<>();
+    private final List<KeyValueTimestamp> results = new ArrayList<>();
     private final Processor<String, String> processor = sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
     private InternalMockProcessorContext context;
@@ -100,7 +103,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         ) {
             @Override
             public <K, V> void forward(final K key, final V value, final To to) {
-                results.add(KeyValue.pair(key, value));
+                toInternal.update(to);
+                results.add(new KeyValueTimestamp<>(key, value, toInternal.timestamp()));
             }
         };
 
@@ -195,9 +199,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
         sessionStore.flush();
         assertEquals(
             Arrays.asList(
-                KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)),
-                KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(time, time)), new Change<>(3L, null))
+                new KeyValueTimestamp<>(
+                    new Windowed<>(sessionId, new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+                    new Change<>(2L, null),
+                    GAP_MS + 1),
+                new KeyValueTimestamp<>(
+                    new Windowed<>(sessionId, new SessionWindow(time, time)),
+                    new Change<>(3L, null),
+                    time)
             ),
             results
         );
@@ -244,13 +257,34 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         assertEquals(
             Arrays.asList(
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)),
-                KeyValue.pair(new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)),
-                KeyValue.pair(new Windowed<>("c", new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null))
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("b", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("c", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
+                    new Change<>(2L, null),
+                    GAP_MS / 2),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)),
+                    new Change<>(1L, null),
+                    GAP_MS + 1),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)),
+                    new Change<>(2L, null),
+                    GAP_MS + 1 + GAP_MS / 2),
+                new KeyValueTimestamp<>(new Windowed<>(
+                    "c",
+                    new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null),
+                    GAP_MS + 1 + GAP_MS / 2)
             ),
             results
         );
@@ -283,9 +317,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         assertEquals(
             Arrays.asList(
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null))
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("b", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("c", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L)
             ),
             results
         );
@@ -302,9 +345,18 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process("a", "1");
         assertEquals(
             Arrays.asList(
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)),
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(null, null)),
-                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 5)), new Change<>(2L, null))
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(0, 0)),
+                    new Change<>(1L, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(0, 0)),
+                    new Change<>(null, null),
+                    0L),
+                new KeyValueTimestamp<>(
+                    new Windowed<>("a", new SessionWindow(0, 5)),
+                    new Change<>(2L, null),
+                    5L)
             ),
             results
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
new file mode 100644
index 0000000..b25febf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class SessionCacheFlushListenerTest {
+    @Test
+    public void shouldForwardKeyNewValueOldValueAndTimestamp() {
+        final InternalProcessorContext context = mock(InternalProcessorContext.class);
+        expect(context.currentNode()).andReturn(null).anyTimes();
+        context.setCurrentNode(null);
+        context.setCurrentNode(null);
+        context.forward(
+            new Windowed<>("key", new SessionWindow(21L, 73L)),
+            new Change<>("newValue", "oldValue"),
+            To.all().withTimestamp(73L));
+        expectLastCall();
+        replay(context);
+
+        new SessionCacheFlushListener<>(context).apply(
+            new Windowed<>("key", new SessionWindow(21L, 73L)),
+            "newValue",
+            "oldValue",
+            42L);
+
+        verify(context);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
similarity index 67%
rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
index f62e826..e99c684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-public class TupleForwarderTest {
+public class SessionTupleForwarderTest {
 
     @Test
     public void shouldSetFlushListenerOnWrappedStateStore() {
@@ -36,13 +38,13 @@ public class TupleForwarderTest {
     }
 
     private void setFlushListener(final boolean sendOldValues) {
-        final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class);
-        final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class);
+        final WrappedStateStore<StateStore, Windowed<Object>, Object> store = mock(WrappedStateStore.class);
+        final SessionCacheFlushListener<Object, Object> flushListener = mock(SessionCacheFlushListener.class);
 
         expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false);
         replay(store);
 
-        new TupleForwarder<>(store, null, flushListener, sendOldValues);
+        new SessionTupleForwarder<>(store, null, flushListener, sendOldValues);
 
         verify(store);
     }
@@ -53,21 +55,27 @@ public class TupleForwarderTest {
         shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
     }
 
-    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) {
+    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) {
         final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class);
         final ProcessorContext context = mock(ProcessorContext.class);
 
-        expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
-        if (sendOldValues) {
-            context.forward("key", new Change<>("newValue",  "oldValue"));
+        expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
+        if (sendOldValued) {
+            context.forward(
+                new Windowed<>("key", new SessionWindow(21L, 42L)),
+                new Change<>("value", "oldValue"),
+                To.all().withTimestamp(42L));
         } else {
-            context.forward("key", new Change<>("newValue", null));
+            context.forward(
+                new Windowed<>("key", new SessionWindow(21L, 42L)),
+                new Change<>("value", null),
+                To.all().withTimestamp(42L));
         }
         expectLastCall();
         replay(store, context);
 
-        new TupleForwarder<>(store, context, null, sendOldValues)
-            .maybeForward("key", "newValue", "oldValue");
+        new SessionTupleForwarder<>(store, context, null, sendOldValued)
+            .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue");
 
         verify(store, context);
     }
@@ -80,8 +88,8 @@ public class TupleForwarderTest {
         expect(store.setFlushListener(null, false)).andReturn(true);
         replay(store, context);
 
-        new TupleForwarder<>(store, context, null, false)
-            .maybeForward("key", "newValue", "oldValue");
+        new SessionTupleForwarder<>(store, context, null, false)
+            .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue");
 
         verify(store, context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index b2c4ec8..d1e5448 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
@@ -32,16 +33,17 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -67,50 +69,93 @@ public class SessionWindowedKStreamImplTest {
     }
 
     @Test
-    public void shouldCountSessionWindowed() {
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+    public void shouldCountSessionWindowedWithCachingDisabled() {
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        shouldCountSessionWindowed();
+    }
+
+    @Test
+    public void shouldCountSessionWindowedWithCachingEnabled() {
+        shouldCountSessionWindowed();
+    }
+
+    private void shouldCountSessionWindowed() {
+        final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>();
         stream.count()
-                .toStream()
-                .foreach(results::put);
+            .toStream()
+            .process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(driver);
         }
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L));
+
+        final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+        assertThat(result.size(), equalTo(3));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(10L, 15L))),
+            equalTo(ValueAndTimestamp.make(2L, 15L)));
+        assertThat(
+            result.get(new Windowed<>("2", new SessionWindow(599L, 600L))),
+            equalTo(ValueAndTimestamp.make(2L, 600L)));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(600L, 600L))),
+            equalTo(ValueAndTimestamp.make(1L, 600L)));
     }
 
     @Test
     public void shouldReduceWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         stream.reduce(MockReducer.STRING_ADDER)
-                .toStream()
-                .foreach(results::put);
+            .toStream()
+            .process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(driver);
         }
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2"));
-        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1"));
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3"));
+
+        final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+        assertThat(result.size(), equalTo(3));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(10, 15))),
+            equalTo(ValueAndTimestamp.make("1+2", 15L)));
+        assertThat(
+            result.get(new Windowed<>("2", new SessionWindow(599L, 600))),
+            equalTo(ValueAndTimestamp.make("1+2", 600L)));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(600, 600))),
+            equalTo(ValueAndTimestamp.make("3", 600L)));
     }
 
     @Test
     public void shouldAggregateSessionWindowed() {
-        final Map<Windowed<String>, String> results = new HashMap<>();
+        final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>();
         stream.aggregate(MockInitializer.STRING_INIT,
                          MockAggregator.TOSTRING_ADDER,
                          sessionMerger,
                          Materialized.with(Serdes.String(), Serdes.String()))
-                .toStream()
-                .foreach(results::put);
+            .toStream()
+            .process(supplier);
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             processData(driver);
         }
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2"));
-        assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1"));
-        assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3"));
+
+        final Map<Windowed<String>, ValueAndTimestamp<String>> result =
+            supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
+
+        assertThat(result.size(), equalTo(3));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(10, 15))),
+            equalTo(ValueAndTimestamp.make("0+0+1+2", 15L)));
+        assertThat(
+            result.get(new Windowed<>("2", new SessionWindow(599, 600))),
+            equalTo(ValueAndTimestamp.make("0+0+1+2", 600L)));
+        assertThat(
+            result.get(new Windowed<>("1", new SessionWindow(600, 600))),
+            equalTo(ValueAndTimestamp.make("0+3", 600L)));
     }
 
     @Test
@@ -126,7 +171,7 @@ public class SessionWindowedKStreamImplTest {
                 equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
         }
     }
 
@@ -144,7 +189,7 @@ public class SessionWindowedKStreamImplTest {
                 equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1"))));
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
         }
     }
 
@@ -165,7 +210,7 @@ public class SessionWindowedKStreamImplTest {
                 equalTo(Arrays.asList(
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
                     KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
-                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1"))));
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
         }
     }
 
@@ -247,6 +292,6 @@ public class SessionWindowedKStreamImplTest {
         driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15));
         driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600));
         driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 599));
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 24f222b..a7151cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -103,7 +103,8 @@ public class SuppressScenarioTest {
         final Topology topology = builder.build();
 
 
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
@@ -179,7 +180,8 @@ public class SuppressScenarioTest {
             .toStream()
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -248,7 +250,8 @@ public class SuppressScenarioTest {
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
         System.out.println(topology.describe());
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -311,7 +314,8 @@ public class SuppressScenarioTest {
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
         System.out.println(topology.describe());
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
@@ -370,7 +374,8 @@ public class SuppressScenarioTest {
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
         System.out.println(topology.describe());
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
@@ -420,7 +425,8 @@ public class SuppressScenarioTest {
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
         System.out.println(topology.describe());
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
@@ -475,7 +481,8 @@ public class SuppressScenarioTest {
             .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
         final Topology topology = builder.build();
         System.out.println(topology.describe());
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        final ConsumerRecordFactory<String, String> recordFactory =
+            new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             // first window
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
@@ -492,10 +499,10 @@ public class SuppressScenarioTest {
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
-                    new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
+                    new KeyValueTimestamp<>("[k1@0/0]", null, 0L),
                     new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
-                    new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
-                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k1@0/5]", null, 5L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
                     new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
                     new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
                 )
@@ -503,14 +510,15 @@ public class SuppressScenarioTest {
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
-                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L),
                     new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
                 )
             );
         }
     }
 
-    private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
+    private static <K, V> void verify(final List<ProducerRecord<K, V>> results,
+                                      final List<KeyValueTimestamp<K, V>> expectedResults) {
         if (results.size() != expectedResults.size()) {
             throw new AssertionError(printRecords(results) + " != " + expectedResults);
         }
@@ -525,7 +533,10 @@ public class SuppressScenarioTest {
         }
     }
 
-    private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
+    private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver,
+                                                                          final String topic,
+                                                                          final Deserializer<K> keyDeserializer,
+                                                                          final Deserializer<V> valueDeserializer) {
         final List<ProducerRecord<K, V>> result = new LinkedList<>();
         for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
              next != null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 8c7325c..128cdc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -34,20 +36,18 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
+import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
@@ -74,8 +74,7 @@ public class CachingSessionStoreTest {
     private CachingSessionStore cachingStore;
     private ThreadCache cache;
 
-    @Before
-    public void setUp() {
+    public CachingSessionStoreTest() {
         final SessionKeySchema schema = new SessionKeySchema();
         final RocksDBSegmentedBytesStore root =
             new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema);
@@ -140,7 +139,7 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldFetchAllSessionsWithSameRecordKey() {
-        final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
+        final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList(
             KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
             KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
             KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
@@ -247,8 +246,8 @@ public class CachingSessionStoreTest {
         final Windowed<Bytes> b = new Windowed<>(keyA, new SessionWindow(1, 2));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(2, 4));
         final Windowed<String> bDeserialized = new Windowed<>("a", new SessionWindow(1, 2));
-        final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener =
-            new CachingKeyValueStoreTest.CacheFlushListenerStub<>(
+        final CacheFlushListenerStub<Windowed<String>, String> flushListener =
+            new CacheFlushListenerStub<>(
                 new SessionWindowedDeserializer<>(new StringDeserializer()),
                 new StringDeserializer());
         cachingStore.setFlushListener(flushListener, true);
@@ -257,7 +256,11 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.singletonMap(bDeserialized, new Change<>("1", null)),
+            Collections.singletonList(
+                new KeyValueTimestamp<>(
+                    bDeserialized,
+                    new Change<>("1", null),
+                    DEFAULT_TIMESTAMP)),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -266,7 +269,11 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.singletonMap(aDeserialized, new Change<>("1", null)),
+            Collections.singletonList(
+                new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>("1", null),
+                    DEFAULT_TIMESTAMP)),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -275,7 +282,11 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.singletonMap(aDeserialized, new Change<>("2", "1")),
+            Collections.singletonList(
+                new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>("2", "1"),
+                    DEFAULT_TIMESTAMP)),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -284,7 +295,11 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.singletonMap(aDeserialized, new Change<>(null, "2")),
+            Collections.singletonList(
+                new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>(null, "2"),
+                    DEFAULT_TIMESTAMP)),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -295,7 +310,7 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.emptyMap(),
+            Collections.emptyList(),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -305,8 +320,8 @@ public class CachingSessionStoreTest {
     public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
         final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
-        final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener =
-            new CachingKeyValueStoreTest.CacheFlushListenerStub<>(
+        final CacheFlushListenerStub<Windowed<String>, String> flushListener =
+            new CacheFlushListenerStub<>(
                 new SessionWindowedDeserializer<>(new StringDeserializer()),
                 new StringDeserializer());
         cachingStore.setFlushListener(flushListener, false);
@@ -321,11 +336,18 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            mkMap(
-                mkEntry(aDeserialized, new Change<>("1", null)),
-                mkEntry(aDeserialized, new Change<>("2", null)),
-                mkEntry(aDeserialized, new Change<>(null, null))
-            ),
+            asList(new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>("1", null),
+                    DEFAULT_TIMESTAMP),
+                new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>("2", null),
+                    DEFAULT_TIMESTAMP),
+                new KeyValueTimestamp<>(
+                    aDeserialized,
+                    new Change<>(null, null),
+                    DEFAULT_TIMESTAMP)),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -336,7 +358,7 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            Collections.emptyMap(),
+            Collections.emptyList(),
             flushListener.forwarded
         );
         flushListener.forwarded.clear();
@@ -466,4 +488,29 @@ public class CachingSessionStoreTest {
         allSessions.add(KeyValue.pair(key, value));
     }
 
+    public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
+        final Deserializer<K> keyDeserializer;
+        final Deserializer<V> valueDesializer;
+        final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>();
+
+        CacheFlushListenerStub(final Deserializer<K> keyDeserializer,
+                               final Deserializer<V> valueDesializer) {
+            this.keyDeserializer = keyDeserializer;
+            this.valueDesializer = valueDesializer;
+        }
+
+        @Override
+        public void apply(final byte[] key,
+                          final byte[] newValue,
+                          final byte[] oldValue,
+                          final long timestamp) {
+            forwarded.add(
+                new KeyValueTimestamp<>(
+                    keyDeserializer.deserialize(null, key),
+                    new Change<>(
+                        valueDesializer.deserialize(null, newValue),
+                        valueDesializer.deserialize(null, oldValue)),
+                    timestamp));
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
index eb0cc55..3e3c1a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
-import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -34,8 +33,13 @@ import org.junit.runner.RunWith;
 
 import java.util.Collections;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThrows;
 
 @RunWith(EasyMockRunner.class)
 public class SessionStoreBuilderTest {
@@ -49,15 +53,15 @@ public class SessionStoreBuilderTest {
     @Before
     public void setUp() throws Exception {
 
-        EasyMock.expect(supplier.get()).andReturn(inner);
-        EasyMock.expect(supplier.name()).andReturn("name");
-        EasyMock.replay(supplier);
+        expect(supplier.get()).andReturn(inner);
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
 
-        builder = new SessionStoreBuilder<>(supplier,
-                                            Serdes.String(),
-                                            Serdes.String(),
-                                            new MockTime()
-        );
+        builder = new SessionStoreBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
     }
 
     @Test
@@ -113,29 +117,37 @@ public class SessionStoreBuilderTest {
         assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerIfInnerIsNull() {
-        new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+        final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()));
+        assertThat(e.getMessage(), equalTo("supplier cannot be null"));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerIfKeySerdeIsNull() {
-        new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+        final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()));
+        assertThat(e.getMessage(), equalTo("name cannot be null"));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerIfValueSerdeIsNull() {
-        new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+        final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()));
+        assertThat(e.getMessage(), equalTo("name cannot be null"));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerIfTimeIsNull() {
-        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+        reset(supplier);
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+        final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null));
+        assertThat(e.getMessage(), equalTo("time cannot be null"));
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowNullPointerIfMetricsScopeIsNull() {
-        new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+        final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()));
+        assertThat(e.getMessage(), equalTo("name cannot be null"));
     }
 
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index e000b82..0ff5c8a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -20,9 +20,12 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -32,6 +35,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     public final ArrayList<String> processed = new ArrayList<>();
     public final ArrayList<K> processedKeys = new ArrayList<>();
     public final ArrayList<V> processedValues = new ArrayList<>();
+    public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
 
     public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
     public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
@@ -77,6 +81,11 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     public void process(final K key, final V value) {
         processedKeys.add(key);
         processedValues.add(value);
+        if (value != null) {
+            lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
+        } else {
+            lastValueAndTimestampPerKey.remove(key);
+        }
         processed.add(
             (key == null ? "null" : key) +
             ":" + (value == null ? "null" : value) +
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index f2de3de..c0110a1 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -289,22 +289,34 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
       Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
     }
     {
-      // late event for first window, included since grade period hasn't passed
+      // out-of-order event for first window, included since grade period hasn't passed
       testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L)
       Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
     }
     {
+      // add to second window
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 13L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // add out-of-order to second window
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 10L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
       // push stream time forward to flush other events through
       testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L)
-      // too-late event should get dropped from the stream
+      // late event should get dropped from the stream
       testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L)
       // should now have to results
       val r1 = testDriver.readRecord[String, Long](sinkTopic)
       r1.key shouldBe "0:2:k1"
       r1.value shouldBe 3L
+      r1.timestamp shouldBe 2L
       val r2 = testDriver.readRecord[String, Long](sinkTopic)
-      r2.key shouldBe "8:8:k1"
-      r2.value shouldBe 1
+      r2.key shouldBe "8:13:k1"
+      r2.value shouldBe 3L
+      r2.timestamp shouldBe 13L
     }
     testDriver.readRecord[String, Long](sinkTopic) shouldBe null
 


Mime
View raw message