kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6978: make window retention time strict (#5218)
Date Sat, 23 Jun 2018 00:08:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 954be11  KAFKA-6978: make window retention time strict (#5218)
954be11 is described below

commit 954be11bf2d3dc9fa11a69830d2ef5ff580ff533
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Jun 22 19:08:00 2018 -0500

    KAFKA-6978: make window retention time strict (#5218)
    
    Enforce window retention times strictly:
    
    * records for windows that are expired get dropped
    * queries for timestamps old enough to be expired immediately answered with null
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kstream/internals/KStreamWindowAggregate.java  |  39 ++++--
 .../kstream/internals/KStreamWindowReduce.java     | 129 ++------------------
 .../internals/GlobalProcessorContextImpl.java      |   2 +-
 .../internals/InternalProcessorContext.java        |   2 +-
 .../internals/InternalTopologyBuilder.java         | 108 ++++++-----------
 .../processor/internals/PartitionGroup.java        |  66 ++++++-----
 .../processor/internals/ProcessorContextImpl.java  |  11 +-
 .../processor/internals/StandbyContextImpl.java    |   2 +-
 .../streams/processor/internals/StreamTask.java    |   1 -
 .../processor/internals/TimestampSupplier.java     |  21 ++++
 .../apache/kafka/streams/state/WindowStore.java    |   2 +-
 .../state/internals/RocksDBWindowStore.java        |   2 +-
 .../internals/KStreamWindowAggregateTest.java      | 131 +++++++++++++++------
 .../kstream/internals/KStreamWindowReduceTest.java |  59 ++++++++--
 .../internals/AbstractProcessorContextTest.java    |   2 +-
 .../processor/internals/PartitionGroupTest.java    |  39 +++++-
 .../kafka/test/InternalMockProcessorContext.java   |  16 +--
 .../apache/kafka/test/NoOpProcessorContext.java    |   4 +-
 .../kafka/streams/TopologyTestDriverTest.java      |  17 +--
 19 files changed, 332 insertions(+), 321 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index a774762..4be1880 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.slf4j.Logger;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
-    private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowAggregate.class);
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final String storeName;
     private final Windows<W> windows;
@@ -66,11 +67,14 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
         private WindowStore<K, T> windowStore;
         private TupleForwarder<Windowed<K>, T> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            this.internalProcessorContext = (InternalProcessorContext) context;
+
             metrics = (StreamsMetricsImpl) context.metrics();
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
@@ -82,7 +86,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             // if the key is null, we do not need proceed aggregating the record
             // the record with the table
             if (key == null) {
-                LOG.warn(
+                log.warn(
                     "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     value, context().topic(), context().partition(), context().offset()
                 );
@@ -92,21 +96,32 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
             // first get the matching windows
             final long timestamp = context().timestamp();
+            final long expiryTime = internalProcessorContext.streamTime() - windows.maintainMs();
+
             final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
             // try update the window, and create the new window for the rest of unmatched window that do not exist yet
             for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
-                T oldAgg = windowStore.fetch(key, entry.getKey());
-
-                if (oldAgg == null) {
-                    oldAgg = initializer.apply();
+                final Long windowStart = entry.getKey();
+                if (windowStart > expiryTime) {
+                    T oldAgg = windowStore.fetch(key, windowStart);
+
+                    if (oldAgg == null) {
+                        oldAgg = initializer.apply();
+                    }
+
+                    final T newAgg = aggregator.apply(key, value, oldAgg);
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, windowStart);
+                    tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
+                } else {
+                    log.warn(
+                        "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] expiration=[{}]",
+                        key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, expiryTime
+                    );
+                    metrics.skippedRecordsSensor().record();
                 }
-
-                final T newAgg = aggregator.apply(key, value, oldAgg);
-
-                // update the store with the new value
-                windowStore.put(key, newAgg, entry.getKey());
-                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 9db861d..babe3eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -18,132 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.WindowStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
-    private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowReduce.class);
-
-    private final String storeName;
-    private final Windows<W> windows;
-    private final Reducer<V> reducer;
-
-    private boolean sendOldValues = false;
 
+class KStreamWindowReduce<K, V, W extends Window> extends KStreamWindowAggregate<K, V, V, W> {
     KStreamWindowReduce(final Windows<W> windows,
                         final String storeName,
                         final Reducer<V> reducer) {
-        this.windows = windows;
-        this.storeName = storeName;
-        this.reducer = reducer;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamWindowReduceProcessor();
-    }
-
-    @Override
-    public void enableSendingOldValues() {
-        sendOldValues = true;
-    }
-
-    private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
-
-        private WindowStore<K, V> windowStore;
-        private TupleForwarder<Windowed<K>, V> tupleForwarder;
-        private StreamsMetricsImpl metrics;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(final ProcessorContext context) {
-            super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
-            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
-        }
-
-        @Override
-        public void process(final K key, final V value) {
-            // if the key is null, we do not need proceed aggregating
-            // the record with the table
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), context().offset()
-                );
-                metrics.skippedRecordsSensor().record();
-                return;
-            }
-
-            // first get the matching windows
-            final long timestamp = context().timestamp();
-            final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
-            // try update the window, and create the new window for the rest of unmatched window that do not exist yet
-            for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
-                final V oldAgg = windowStore.fetch(key, entry.getKey());
-
-                final V newAgg;
-                if (oldAgg == null) {
-                    newAgg = value;
-                } else {
-                    newAgg = reducer.apply(oldAgg, value);
-                }
-
-                // update the store with the new value
-                windowStore.put(key, newAgg, entry.getKey());
-                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
-            }
-        }
-    }
-
-    @Override
-    public KTableValueGetterSupplier<Windowed<K>, V> view() {
-
-        return new KTableValueGetterSupplier<Windowed<K>, V>() {
-
-            public KTableValueGetter<Windowed<K>, V> get() {
-                return new KStreamWindowReduceValueGetter();
-            }
-
-            @Override
-            public String[] storeNames() {
-                return new String[]{storeName};
-            }
-        };
-    }
-
-    private class KStreamWindowReduceValueGetter implements KTableValueGetter<Windowed<K>, V> {
-
-        private WindowStore<K, V> windowStore;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(final ProcessorContext context) {
-            windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public V get(final Windowed<K> windowedKey) {
-            final K key = windowedKey.key();
-            final W window = (W) windowedKey.window();
-
-            return windowStore.fetch(key, window.start());
-        }
-
-        @Override
-        public void close() {
-        }
+        super(
+            windows,
+            storeName,
+            () -> null,
+            (key, newValue, oldValue) -> oldValue == null ? newValue : reducer.apply(oldValue, newValue)
+        );
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index c469be9..30599e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -97,7 +97,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     }
 
     @Override
-    public Long streamTime() {
+    public long streamTime() {
         throw new RuntimeException("Stream time is not implemented for the global processor context.");
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index cfc1970..6db7a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -64,5 +64,5 @@ public interface InternalProcessorContext extends ProcessorContext {
      */
     void uninitialize();
 
-    Long streamTime();
+    long streamTime();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 5b4b4d73..ed51754 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -123,83 +123,51 @@ public class InternalTopologyBuilder {
     private Map<Integer, Set<String>> nodeGroups = null;
 
     // TODO: this is only temporary for 2.0 and should be removed
-    public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
+    private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
 
-    public interface StateStoreFactory {
-        Set<String> users();
-        boolean loggingEnabled();
-        StateStore build();
-        String name();
-        boolean isWindowStore();
-        Map<String, String> logConfig();
-        long retentionPeriod();
-    }
-
-    private static abstract class AbstractStateStoreFactory implements StateStoreFactory {
+    public static class StateStoreFactory {
+        private final StoreBuilder builder;
         private final Set<String> users = new HashSet<>();
-        private final String name;
-        private final boolean loggingEnabled;
-        private final boolean windowStore;
-        private final Map<String, String> logConfig;
-
-        AbstractStateStoreFactory(final String name,
-                                  final boolean loggingEnabled,
-                                  final boolean windowStore,
-                                  final Map<String, String> logConfig) {
-            this.name = name;
-            this.loggingEnabled = loggingEnabled;
-            this.windowStore = windowStore;
-            this.logConfig = logConfig;
-        }
 
-        @Override
-        public Set<String> users() {
-            return users;
+        private StateStoreFactory(final StoreBuilder<?> builder) {
+            this.builder = builder;
         }
 
-        @Override
-        public boolean loggingEnabled() {
-            return loggingEnabled;
+        public StateStore build() {
+            return builder.build();
         }
 
-        @Override
-        public String name() {
-            return name;
+        long retentionPeriod() {
+            if (isWindowStore()) {
+                return ((WindowStoreBuilder) builder).retentionPeriod();
+            } else {
+                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+            }
         }
 
-        @Override
-        public boolean isWindowStore() {
-            return windowStore;
+        private Set<String> users() {
+            return users;
         }
 
-        @Override
-        public Map<String, String> logConfig() {
-            return logConfig;
+        /** Visible for testing */
+        public boolean loggingEnabled() {
+            return builder.loggingEnabled();
         }
-    }
 
-    private static class StoreBuilderFactory extends AbstractStateStoreFactory {
-        private final StoreBuilder builder;
-
-        StoreBuilderFactory(final StoreBuilder<?> builder) {
-            super(builder.name(),
-                  builder.loggingEnabled(),
-                  builder instanceof WindowStoreBuilder,
-                  builder.logConfig());
-            this.builder = builder;
+        private String name() {
+            return builder.name();
         }
 
-        @Override
-        public StateStore build() {
-            return builder.build();
+        private boolean isWindowStore() {
+            return builder instanceof WindowStoreBuilder;
         }
 
-        @Override
-        public long retentionPeriod() {
-            if (!isWindowStore()) {
-                throw new IllegalStateException("retentionPeriod is not supported when not a window store");
-            }
-            return ((WindowStoreBuilder) builder).retentionPeriod();
+        // Apparently Java strips the generics from this method because we're using the raw type for builder,
+        // even though this method doesn't use builder's (missing) type parameter. Our usage seems obviously
+        // correct, though, hence the suppression.
+        @SuppressWarnings("unchecked")
+        private Map<String, String> logConfig() {
+            return builder.logConfig();
         }
     }
 
@@ -526,7 +494,7 @@ public class InternalTopologyBuilder {
             throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
         }
 
-        stateFactories.put(storeBuilder.name(), new StoreBuilderFactory(storeBuilder));
+        stateFactories.put(storeBuilder.name(), new StateStoreFactory(storeBuilder));
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
@@ -1124,12 +1092,12 @@ public class InternalTopologyBuilder {
 
     private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
                                                            final String name) {
-        if (!factory.isWindowStore()) {
-            return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
-        } else {
+        if (factory.isWindowStore()) {
             final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
             config.setRetentionMs(factory.retentionPeriod());
             return config;
+        } else {
+            return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
         }
     }
 
@@ -1744,18 +1712,18 @@ public class InternalTopologyBuilder {
         public String toString() {
             final StringBuilder sb = new StringBuilder();
             sb.append("Topologies:\n ");
-            final TopologyDescription.Subtopology[] sortedSubtopologies = 
+            final TopologyDescription.Subtopology[] sortedSubtopologies =
                 subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[subtopologies.size()]);
-            final TopologyDescription.GlobalStore[] sortedGlobalStores = 
+            final TopologyDescription.GlobalStore[] sortedGlobalStores =
                 globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[globalStores.size()]);
             int expectedId = 0;
             int subtopologiesIndex = sortedSubtopologies.length - 1;
             int globalStoresIndex = sortedGlobalStores.length - 1;
             while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                 sb.append("  ");
-                final TopologyDescription.Subtopology subtopology = 
+                final TopologyDescription.Subtopology subtopology =
                     sortedSubtopologies[subtopologiesIndex];
-                final TopologyDescription.GlobalStore globalStore = 
+                final TopologyDescription.GlobalStore globalStore =
                     sortedGlobalStores[globalStoresIndex];
                 if (subtopology.id() == expectedId) {
                     sb.append(subtopology);
@@ -1767,14 +1735,14 @@ public class InternalTopologyBuilder {
                 expectedId++;
             }
             while (subtopologiesIndex != -1) {
-                final TopologyDescription.Subtopology subtopology = 
+                final TopologyDescription.Subtopology subtopology =
                     sortedSubtopologies[subtopologiesIndex];
                 sb.append("  ");
                 sb.append(subtopology);
                 subtopologiesIndex--;
             }
             while (globalStoresIndex != -1) {
-                final TopologyDescription.GlobalStore globalStore = 
+                final TopologyDescription.GlobalStore globalStore =
                     sortedGlobalStores[globalStoresIndex];
                 sb.append("  ");
                 sb.append(globalStore);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index dcaa755..c809da9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -33,7 +33,9 @@ public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
 
-    private final PriorityQueue<RecordQueue> queuesByTime;
+    private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
+    private long streamTime;
+    private int totalBuffered;
 
     public static class RecordInfo {
         RecordQueue queue;
@@ -51,30 +53,12 @@ public class PartitionGroup {
         }
     }
 
-    // since task is thread-safe, we do not need to synchronize on local variables
-    private int totalBuffered;
 
     PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
-        queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
-
-            @Override
-            public int compare(final RecordQueue queue1, final RecordQueue queue2) {
-                final long time1 = queue1.timestamp();
-                final long time2 = queue2.timestamp();
-
-                if (time1 < time2) {
-                    return -1;
-                }
-                if (time1 > time2) {
-                    return 1;
-                }
-                return 0;
-            }
-        });
-
+        nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
-
         totalBuffered = 0;
+        streamTime = -1;
     }
 
     /**
@@ -85,19 +69,27 @@ public class PartitionGroup {
     StampedRecord nextRecord(final RecordInfo info) {
         StampedRecord record = null;
 
-        final RecordQueue queue = queuesByTime.poll();
+        final RecordQueue queue = nonEmptyQueuesByTime.poll();
+        info.queue = queue;
+
         if (queue != null) {
             // get the first record from this queue.
             record = queue.poll();
 
-            if (!queue.isEmpty()) {
-                queuesByTime.offer(queue);
+            if (record != null) {
+                --totalBuffered;
+
+                if (!queue.isEmpty()) {
+                    nonEmptyQueuesByTime.offer(queue);
+                }
+
+                // Since this was previously a queue with min timestamp,
+                // streamTime could only advance if this queue's time did.
+                if (queue.timestamp() > streamTime) {
+                    computeStreamTime();
+                }
             }
-        }
-        info.queue = queue;
 
-        if (record != null) {
-            --totalBuffered;
         }
 
         return record;
@@ -114,11 +106,17 @@ public class PartitionGroup {
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         final int oldSize = recordQueue.size();
+        final long oldTimestamp = recordQueue.timestamp();
         final int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future if it was empty before
         if (oldSize == 0 && newSize > 0) {
-            queuesByTime.offer(recordQueue);
+            nonEmptyQueuesByTime.offer(recordQueue);
+        }
+
+        // Adding to this queue could only advance streamTime if it was previously the queue with min timestamp (= streamTime)
+        if (oldTimestamp <= streamTime && recordQueue.timestamp() > streamTime) {
+            computeStreamTime();
         }
 
         totalBuffered += newSize - oldSize;
@@ -135,15 +133,19 @@ public class PartitionGroup {
      * partition timestamp among all its partitions
      */
     public long timestamp() {
+        return streamTime;
+    }
+
+    private void computeStreamTime() {
         // we should always return the smallest timestamp of all partitions
         // to avoid group partition time goes backward
         long timestamp = Long.MAX_VALUE;
         for (final RecordQueue queue : partitionQueues.values()) {
-            if (timestamp > queue.timestamp()) {
+            if (queue.timestamp() < timestamp) {
                 timestamp = queue.timestamp();
             }
         }
-        return timestamp;
+        this.streamTime = timestamp;
     }
 
     /**
@@ -168,7 +170,7 @@ public class PartitionGroup {
     }
 
     public void clear() {
-        queuesByTime.clear();
+        nonEmptyQueuesByTime.clear();
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.clear();
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 36a309c..beab35f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,13 +28,12 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
-import java.util.function.Supplier;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final StreamTask task;
     private final RecordCollector collector;
-    private Supplier<Long> streamTimeSupplier;
+    private TimestampSupplier streamTimeSupplier;
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
@@ -56,7 +55,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @Override
     public RecordCollector recordCollector() {
-        return this.collector;
+        return collector;
     }
 
     /**
@@ -155,13 +154,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return task.schedule(interval, type, callback);
     }
 
-    void setStreamTimeSupplier(final Supplier<Long> streamTimeSupplier) {
+    void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
         this.streamTimeSupplier = streamTimeSupplier;
     }
 
     @Override
-    public Long streamTime() {
-        return this.streamTimeSupplier.get();
+    public long streamTime() {
+        return streamTimeSupplier.get();
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 58c2e2c..acc6ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -217,7 +217,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     @Override
-    public Long streamTime() {
+    public long streamTime() {
         throw new RuntimeException("Stream time is not implemented for the standby context.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6993ef8..6af4c4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -635,7 +635,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @param partition the partition
      * @param records   the records
      */
-    @SuppressWarnings("unchecked")
     public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
         final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
new file mode 100644
index 0000000..a6a7a42
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public interface TimestampSupplier {
+    long get();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index d2f6ad8..ee4c19a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
 public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
 
     /**
-     * Put a key-value pair with the current wall-clock time as the timestamp
+     * Put a key-value pair with the current record time as the timestamp
      * into the corresponding window
      * @param key The key to associate the value to
      * @param value The value to update, it can be null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index a9af36d..4c0e01f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -102,7 +102,7 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
         return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
-    
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index d134bd1..dd2cf05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -16,23 +16,26 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessor;
@@ -44,7 +47,10 @@ import java.util.List;
 import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
@@ -134,12 +140,7 @@ public class KStreamWindowAggregateTest {
         table2.toStream().process(supplier);
 
 
-        table1.join(table2, new ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(final String p1, final String p2) {
-                return p1 + "%" + p2;
-            }
-        }).toStream().process(supplier);
+        table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
@@ -151,11 +152,11 @@ public class KStreamWindowAggregateTest {
             final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
 
             processors.get(0).checkAndClearProcessResult(
-                    "[A@0/10]:0+1",
-                    "[B@0/10]:0+2",
-                    "[C@0/10]:0+3",
-                    "[D@0/10]:0+4",
-                    "[A@0/10]:0+1+1"
+                "[A@0/10]:0+1",
+                "[B@0/10]:0+2",
+                "[C@0/10]:0+3",
+                "[D@0/10]:0+4",
+                "[A@0/10]:0+1+1"
             );
             processors.get(1).checkAndClearProcessResult();
             processors.get(2).checkAndClearProcessResult();
@@ -167,11 +168,11 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
 
             processors.get(0).checkAndClearProcessResult(
-                    "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
-                    "[B@0/10]:0+2+2", "[B@5/15]:0+2",
-                    "[D@0/10]:0+4+4", "[D@5/15]:0+4",
-                    "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
-                    "[C@0/10]:0+3+3", "[C@5/15]:0+3"
+                "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+                "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+                "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+                "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+                "[C@0/10]:0+3+3", "[C@5/15]:0+3"
             );
             processors.get(1).checkAndClearProcessResult();
             processors.get(2).checkAndClearProcessResult();
@@ -184,18 +185,18 @@ public class KStreamWindowAggregateTest {
 
             processors.get(0).checkAndClearProcessResult();
             processors.get(1).checkAndClearProcessResult(
-                    "[A@0/10]:0+a",
-                    "[B@0/10]:0+b",
-                    "[C@0/10]:0+c",
-                    "[D@0/10]:0+d",
-                    "[A@0/10]:0+a+a"
+                "[A@0/10]:0+a",
+                "[B@0/10]:0+b",
+                "[C@0/10]:0+c",
+                "[D@0/10]:0+d",
+                "[A@0/10]:0+a+a"
             );
             processors.get(2).checkAndClearProcessResult(
-                    "[A@0/10]:0+1+1+1%0+a",
-                    "[B@0/10]:0+2+2+2%0+b",
-                    "[C@0/10]:0+3+3%0+c",
-                    "[D@0/10]:0+4+4%0+d",
-                    "[A@0/10]:0+1+1+1%0+a+a");
+                "[A@0/10]:0+1+1+1%0+a",
+                "[B@0/10]:0+2+2+2%0+b",
+                "[C@0/10]:0+3+3%0+c",
+                "[D@0/10]:0+4+4%0+d",
+                "[A@0/10]:0+1+1+1%0+a+a");
 
             driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
             driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
@@ -205,18 +206,18 @@ public class KStreamWindowAggregateTest {
 
             processors.get(0).checkAndClearProcessResult();
             processors.get(1).checkAndClearProcessResult(
-                    "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
-                    "[B@0/10]:0+b+b", "[B@5/15]:0+b",
-                    "[D@0/10]:0+d+d", "[D@5/15]:0+d",
-                    "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
-                    "[C@0/10]:0+c+c", "[C@5/15]:0+c"
+                "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+                "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+                "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+                "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+                "[C@0/10]:0+c+c", "[C@5/15]:0+c"
             );
             processors.get(2).checkAndClearProcessResult(
-                    "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
-                    "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
-                    "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
-                    "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
-                    "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
+                "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+                "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+                "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+                "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+                "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
             );
         }
     }
@@ -231,7 +232,7 @@ public class KStreamWindowAggregateTest {
             .windowedBy(TimeWindows.of(10).advanceBy(5))
             .aggregate(
                 MockInitializer.STRING_INIT,
-                MockAggregator.<String, String>toStringInstance("+"),
+                MockAggregator.toStringInstance("+"),
                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())
             );
 
@@ -244,4 +245,56 @@ public class KStreamWindowAggregateTest {
             assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
         }
     }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingExpiredWindow() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(10).advanceBy(5).until(100))
+            .aggregate(
+                () -> "",
+                MockAggregator.toStringInstance("+"),
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+            )
+            .toStream()
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("output");
+
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
+            driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
+            driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
+            driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
+            driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
+            driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
+            driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
+            LogCaptureAppender.unregister(appender);
+
+            assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(7.0));
+            assertThat(appender.getMessages(), hasItems(
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0] expiration=[0]"
+            ));
+
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6);
+            assertThat(driver.readOutput("output"), nullValue());
+        }
+    }
+
+    private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
+        return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 4ae2f76..4cf9324 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -16,23 +16,29 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Properties;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
@@ -49,12 +55,7 @@ public class KStreamWindowReduceTest {
             .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
             .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(500L))
-            .reduce(new Reducer<String>() {
-                @Override
-                public String apply(final String value1, final String value2) {
-                    return value1 + "+" + value2;
-                }
-            });
+            .reduce((value1, value2) -> value1 + "+" + value2);
 
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -66,4 +67,48 @@ public class KStreamWindowReduceTest {
             assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
         }
     }
+
+    @Test
+    public void shouldLogAndMeterOnExpiredEvent() {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(5L).until(100))
+            .reduce((value1, value2) -> value1 + "+" + value2)
+            .toStream()
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("output");
+
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "1", 1L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "2", 2L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "3", 3L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "4", 4L));
+            driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
+            LogCaptureAppender.unregister(appender);
+
+            assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(5.0));
+            assertThat(appender.getMessages(), hasItems(
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]"
+            ));
+
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/10]", "5", 5);
+            assertThat(driver.readOutput("output"), nullValue());
+        }
+    }
+
+    private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
+        return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 1ef3f5d..f25f38f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -218,7 +218,7 @@ public class AbstractProcessorContextTest {
         public void commit() {}
 
         @Override
-        public Long streamTime() {
+        public long streamTime() {
             throw new RuntimeException("not implemented");
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 1d2f613..9823ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -86,6 +86,9 @@ public class PartitionGroupTest {
             new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
 
         group.addRawRecords(partition2, list2);
+        // 1:[1, 3, 5]
+        // 2:[2, 4, 6]
+        // st: 1
 
         assertEquals(6, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
@@ -97,6 +100,9 @@ public class PartitionGroupTest {
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
+        // 1:[3, 5]
+        // 2:[2, 4, 6]
+        // st: 2
         assertEquals(partition1, info.partition());
         assertEquals(1L, record.timestamp);
         assertEquals(5, group.numBuffered());
@@ -106,6 +112,9 @@ public class PartitionGroupTest {
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
+        // 1:[3, 5]
+        // 2:[4, 6]
+        // st: 3
         assertEquals(partition2, info.partition());
         assertEquals(2L, record.timestamp);
         assertEquals(4, group.numBuffered());
@@ -113,13 +122,15 @@ public class PartitionGroupTest {
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(3L, group.timestamp());
 
-        // add three 3 records with timestamp 2, 4, 6 to partition-1 again
+        // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
             new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
 
         group.addRawRecords(partition1, list3);
-
+        // 1:[3, 5, 2, 4]
+        // 2:[4, 6]
+        // st: 3 (non-decreasing, so adding 2 doesn't change it)
         assertEquals(6, group.numBuffered());
         assertEquals(4, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
@@ -127,6 +138,9 @@ public class PartitionGroupTest {
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
+        // 1:[5, 2, 4]
+        // 2:[4, 6]
+        // st: 3 (2's presence prevents it from advancing to 4)
         assertEquals(partition1, info.partition());
         assertEquals(3L, record.timestamp);
         assertEquals(5, group.numBuffered());
@@ -134,8 +148,11 @@ public class PartitionGroupTest {
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(3L, group.timestamp());
 
-        // get one more record, now time should be advanced
+        // get one record, time should not be advanced
         record = group.nextRecord(info);
+        // 1:[2, 4]
+        // 2:[4, 6]
+        // st: 3 (2's presence prevents it from advancing to 4)
         assertEquals(partition1, info.partition());
         assertEquals(5L, record.timestamp);
         assertEquals(4, group.numBuffered());
@@ -143,8 +160,11 @@ public class PartitionGroupTest {
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(3L, group.timestamp());
 
-        // get one more record, time should not be advanced
+        // get one more record, now time should be advanced
         record = group.nextRecord(info);
+        // 1:[4]
+        // 2:[4, 6]
+        // st: 4
         assertEquals(partition1, info.partition());
         assertEquals(2L, record.timestamp);
         assertEquals(3, group.numBuffered());
@@ -152,8 +172,11 @@ public class PartitionGroupTest {
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(4L, group.timestamp());
 
-        // get one more record, now time should be advanced
+        // get one more record, time should not be advanced
         record = group.nextRecord(info);
+        // 1:[4]
+        // 2:[6]
+        // st: 4
         assertEquals(partition2, info.partition());
         assertEquals(4L, record.timestamp);
         assertEquals(2, group.numBuffered());
@@ -163,6 +186,9 @@ public class PartitionGroupTest {
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
+        // 1:[]
+        // 2:[6]
+        // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4)
         assertEquals(partition1, info.partition());
         assertEquals(4L, record.timestamp);
         assertEquals(1, group.numBuffered());
@@ -172,6 +198,9 @@ public class PartitionGroupTest {
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
+        // 1:[]
+        // 2:[]
+        // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.)
         assertEquals(partition2, info.partition());
         assertEquals(6L, record.timestamp);
         assertEquals(0, group.numBuffered());
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 3251489..ca6d5a7 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -61,7 +61,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     private Serde<?> keySerde;
     private Serde<?> valSerde;
     private long timestamp = -1L;
-    private long streamTime = -1;
+    private long streamTime = -1L;
 
     public InternalMockProcessorContext() {
         this(null,
@@ -119,12 +119,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
             valSerde,
             new StreamsMetricsImpl(new Metrics(), "mock"),
             new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
-            new RecordCollector.Supplier() {
-                @Override
-                public RecordCollector recordCollector() {
-                    return collector;
-                }
-            },
+            () -> collector,
             cache
         );
     }
@@ -180,12 +175,12 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     @Override
     public void initialized() {}
 
-    public void setStreamTime(final long time) {
-        streamTime = time;
+    public void setStreamTime(final long currentTime) {
+        streamTime = currentTime;
     }
 
     @Override
-    public Long streamTime() {
+    public long streamTime() {
         return streamTime;
     }
 
@@ -346,5 +341,4 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
 
         return new WrappedBatchingStateRestoreCallback(restoreCallback);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 7b695c4..521a50e 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -86,8 +86,8 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public Long streamTime() {
-        throw new RuntimeException("not implemented");
+    public long streamTime() {
+        throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext");
     }
 
     @Override
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d0d4ed1..497a6c3 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -54,6 +54,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -63,9 +64,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Arrays;
 import java.util.regex.Pattern;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -97,13 +100,11 @@ public class TopologyTestDriverTest {
     private final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
 
     private TopologyTestDriver testDriver;
-    private final Properties config = new Properties() {
-        {
-            put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver");
-            put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
-            put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-        }
-    };
+    private final Properties config = mkProperties(mkMap(
+        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
+        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+    ));
     private KeyValueStore<String, Long> store;
 
     private final StringDeserializer stringDeserializer = new StringDeserializer();


Mime
View raw message