kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-7895: fix stream-time reckoning for Suppress (2.2) (#6286)
Date Wed, 20 Feb 2019 15:16:13 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 7517d4e  KAFKA-7895: fix stream-time reckoning for Suppress (2.2) (#6286)
7517d4e is described below

commit 7517d4e114cf772abfb918fbfa1ed35e3c5a404d
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Feb 20 09:15:57 2019 -0600

    KAFKA-7895: fix stream-time reckoning for Suppress (2.2) (#6286)
    
    Even within a Task, different Processors have different perceptions
    of time, due to record caching on stores and in suppression itself,
    and in general, due to any processor logic that may hold onto
    records arbitrarily and emit them later. Thanks to this, we can't rely
    on the whole task existing in the same "instant" of stream-time. The
    solution is for each processor node that cares about stream-time to
    track it independently.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
---
 .../internals/KStreamSessionWindowAggregate.java   |   5 +-
 .../kstream/internals/KStreamWindowAggregate.java  |   5 +-
 .../suppress/KTableSuppressProcessor.java          |   8 +-
 .../internals/GlobalProcessorContextImpl.java      |   8 --
 .../internals/InternalProcessorContext.java        |   2 -
 .../processor/internals/ProcessorContextImpl.java  |  10 --
 .../streams/processor/internals/RecordQueue.java   |   2 +-
 .../processor/internals/StandbyContextImpl.java    |  12 --
 .../streams/processor/internals/StandbyTask.java   |   8 +-
 .../streams/processor/internals/StreamTask.java    |   1 -
 .../processor/internals/TimestampSupplier.java     |  21 ---
 .../streams/state/internals/AbstractSegments.java  |  10 +-
 .../internals/RocksDBSegmentedBytesStore.java      |  24 +++-
 .../kafka/streams/state/internals/Segments.java    |   4 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   6 +-
 .../KTableSuppressProcessorMetricsTest.java        |   2 -
 .../suppress/KTableSuppressProcessorTest.java      |  19 ---
 .../internals/AbstractProcessorContextTest.java    |   5 -
 .../internals/GlobalProcessorContextImplTest.java  |   5 -
 .../state/internals/KeyValueSegmentsTest.java      |  72 +++++-----
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  20 ++-
 .../state/internals/RocksDBWindowStoreTest.java    |   5 +-
 .../kafka/streams/tests/SmokeTestClient.java       | 147 +++++++++------------
 .../kafka/streams/tests/SmokeTestDriver.java       |  58 +++++++-
 .../kafka/test/InternalMockProcessorContext.java   |  14 +-
 .../kafka/test/MockInternalProcessorContext.java   |  10 --
 .../apache/kafka/test/NoOpProcessorContext.java    |   5 -
 27 files changed, 224 insertions(+), 264 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 09e1cb4..60f1b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -81,6 +82,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -107,7 +109,8 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 return;
             }
 
-            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
+            observedStreamTime = Math.max(observedStreamTime, context().timestamp());
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
 
             final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 0edbe4e..e5f290e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -75,6 +76,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -103,7 +105,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
 
             // first get the matching windows
             final long timestamp = context().timestamp();
-            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
 
             final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 813c558..691e09e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
@@ -51,6 +52,8 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private Serde<K> keySerde;
     private FullChangeSerde<V> valueSerde;
 
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
     public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
                                    final String storeName,
                                    final Serde<K> keySerde,
@@ -80,6 +83,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
 
     @Override
     public void process(final K key, final Change<V> value) {
+        observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
         buffer(key, value);
         enforceConstraints();
     }
@@ -95,7 +99,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     }
 
     private void enforceConstraints() {
-        final long streamTime = internalProcessorContext.streamTime();
+        final long streamTime = observedStreamTime;
         final long expiryTime = streamTime - suppressDurationMillis;
 
         buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
@@ -142,4 +146,4 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     @Override
     public void close() {
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index ccf0337..2f58836 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -107,12 +107,4 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public long streamTime() {
-        throw new UnsupportedOperationException("Stream-time is not defined for global tasks.");
-    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 98511fd..0f67dff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -63,6 +63,4 @@ public interface InternalProcessorContext extends ProcessorContext {
      * Mark this context as being uninitialized
      */
     void uninitialize();
-
-    long streamTime();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 4bf66e0..af3e7d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -46,7 +46,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     private final StreamTask task;
     private final RecordCollector collector;
-    private TimestampSupplier streamTimeSupplier;
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
@@ -205,15 +204,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
     }
 
-    void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
-        this.streamTimeSupplier = streamTimeSupplier;
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTimeSupplier.get();
-    }
-
     private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends WrappedStateStore<T> {
         static final String ERROR_MESSAGE = "Global store is read only";
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index d06d7f3..572e629 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -36,7 +36,7 @@ import java.util.ArrayDeque;
  */
 public class RecordQueue {
 
-    static final long UNKNOWN = -1L;
+    static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
     private final SourceNode source;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 6b835d9..ee69373 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -75,8 +75,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         }
     };
 
-    private long streamTime = RecordQueue.UNKNOWN;
-
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
                        final ProcessorStateManager stateMgr,
@@ -231,14 +229,4 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     public ProcessorNode currentNode() {
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
-
-    void updateStreamTime(final long streamTime) {
-        this.streamTime = Math.max(this.streamTime, streamTime);
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 45f06b2..bd8ba34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -37,7 +37,6 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
     private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
-    private final StandbyContextImpl standbyContext;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
@@ -60,7 +59,7 @@ public class StandbyTask extends AbstractTask {
                 final StateDirectory stateDirectory) {
         super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
-        processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+        processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
     }
 
     @Override
@@ -120,7 +119,7 @@ public class StandbyTask extends AbstractTask {
 
     private void flushAndCheckpointState() {
         stateMgr.flush();
-        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+        stateMgr.checkpoint(Collections.emptyMap());
     }
 
     /**
@@ -177,9 +176,6 @@ public class StandbyTask extends AbstractTask {
             if (record.offset() < limit) {
                 restoreRecords.add(record);
                 lastOffset = record.offset();
-                // ideally, we'd use the stream time at the time of the change logging, but we'll settle for
-                // record timestamp for now.
-                standbyContext.updateStreamTime(record.timestamp());
             } else {
                 remainingRecords.add(record);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 247a156..c4fecef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -236,7 +236,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         recordInfo = new PartitionGroup.RecordInfo();
         partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl));
-        processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
deleted file mode 100644
index a6a7a42..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-public interface TimestampSupplier {
-    long get();
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 4d60b08..82713ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -72,8 +72,10 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
     }
 
     @Override
-    public S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
-        final long minLiveTimestamp = context.streamTime() - retentionPeriod;
+    public S getOrCreateSegmentIfLive(final long segmentId,
+                                      final InternalProcessorContext context,
+                                      final long streamTime) {
+        final long minLiveTimestamp = streamTime - retentionPeriod;
         final long minLiveSegment = segmentId(minLiveTimestamp);
 
         final S toReturn;
@@ -89,7 +91,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
     }
 
     @Override
-    public void openExisting(final InternalProcessorContext context) {
+    public void openExisting(final InternalProcessorContext context, final long streamTime) {
         try {
             final File dir = new File(context.stateDir(), name);
             if (dir.exists()) {
@@ -117,7 +119,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
             // ignore
         }
 
-        final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
+        final long minLiveSegment = segmentId(streamTime - retentionPeriod);
         cleanupEarlierThan(minLiveSegment);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 79e6b95..0ed4e9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
@@ -51,6 +52,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private volatile boolean open;
     private Set<KeyValueSegment> bulkLoadSegments;
     private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
     RocksDBSegmentedBytesStore(final String name,
                                final String metricScope,
@@ -107,7 +109,9 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     @Override
     public void remove(final Bytes key) {
-        final KeyValueSegment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final long timestamp = keySchema.segmentTimestamp(key);
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+        final KeyValueSegment segment = segments.getSegmentForTimestamp(timestamp);
         if (segment == null) {
             return;
         }
@@ -117,8 +121,9 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     @Override
     public void put(final Bytes key, final byte[] value) {
         final long timestamp = keySchema.segmentTimestamp(key);
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
         final long segmentId = segments.segmentId(timestamp);
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
         if (segment == null) {
             expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
@@ -162,7 +167,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
             "expired-window-record-drop"
         );
 
-        segments.openExisting(this.context);
+        segments.openExisting(this.context, observedStreamTime);
 
         bulkLoadSegments = new HashSet<>(segments.allSegments());
 
@@ -214,10 +219,17 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     // Visible for testing
     Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final KeyValue<byte[], byte[]> record : records) {
+            final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key));
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+        }
+
         final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
         for (final KeyValue<byte[], byte[]> record : records) {
-            final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
-            final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+            final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key));
+            final long segmentId = segments.segmentId(timestamp);
+            final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
             if (segment != null) {
                 // This handles the case that state store is moved to a new client and does not
                 // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
@@ -246,7 +258,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     }
 
     private void toggleForBulkLoading(final boolean prepareForBulkload) {
-        for (final KeyValueSegment segment: segments.allSegments()) {
+        for (final KeyValueSegment segment : segments.allSegments()) {
             segment.toggleDbForBulkLoading(prepareForBulkload);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 71d0c46..8e2c40f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -28,11 +28,11 @@ interface Segments<S extends Segment> {
 
     S getSegmentForTimestamp(final long timestamp);
 
-    S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context);
+    S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context, final long streamTime);
 
     S getOrCreateSegment(final long segmentId, final InternalProcessorContext context);
 
-    void openExisting(final InternalProcessorContext context);
+    void openExisting(final InternalProcessorContext context, final long streamTime);
 
     List<S> segments(final long timeFrom, final long timeTo);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 966310c..4038bfd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -342,7 +342,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         initStore(false);
         processor.init(context);
-        context.setStreamTime(20);
+
+        // dummy record to advance stream time
+        context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
         processor.process("A", "1");
         context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 986dc6f..228cfc8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -155,7 +155,6 @@ public class KTableSuppressProcessorMetricsTest {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "longKey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -174,7 +173,6 @@ public class KTableSuppressProcessorMetricsTest {
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
         }
 
-        context.setStreamTime(timestamp + 1);
         context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
         processor.process("key", value);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 0f2b36b..354fdd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -104,7 +104,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -124,7 +123,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -144,14 +142,12 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("topic", 0, 1, null, 1L);
-        context.setStreamTime(1L);
         processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
@@ -171,7 +167,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("topic", 0, 0, null, recordTime);
-        context.setStreamTime(recordTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -183,7 +178,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime2);
-        context.setStreamTime(recordTime2);
         processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
         assertThat(context.forwarded(), hasSize(0));
 
@@ -192,7 +186,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime3);
-        context.setStreamTime(recordTime3);
         processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -219,14 +212,12 @@ public class KTableSuppressProcessorTest {
         final long streamTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(streamTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("", 0, 1L, null, windowEnd);
-        context.setStreamTime(windowEnd);
         processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -244,7 +235,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -268,7 +258,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -290,7 +279,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -311,7 +299,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -336,7 +323,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -361,7 +347,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -380,7 +365,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -403,7 +387,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -426,7 +409,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
@@ -450,7 +432,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index ee79e40..8afd302 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -227,10 +227,5 @@ public class AbstractProcessorContextTest {
 
         @Override
         public void commit() {}
-
-        @Override
-        public long streamTime() {
-            throw new RuntimeException("not implemented");
-        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 21b6623..d849adc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -129,9 +129,4 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowToSchedulePunctuations() {
         globalContext.schedule(null, null, null);
     }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void shouldNotAllowToGetStreamTime() {
-        globalContext.streamTime();
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index 7c85134..59cec67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -94,9 +94,9 @@ public class KeyValueSegmentsTest {
 
     @Test
     public void shouldCreateSegments() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context);
+        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
         assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory());
@@ -107,17 +107,16 @@ public class KeyValueSegmentsTest {
 
     @Test
     public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context));
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
         assertFalse(new File(context.stateDir(), "test/test.0").exists());
     }
 
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        context.setStreamTime(SEGMENT_INTERVAL * 7);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, context);
+        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
@@ -128,22 +127,22 @@ public class KeyValueSegmentsTest {
 
     @Test
     public void shouldGetSegmentForTimestamp() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
+        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
         assertEquals(segment, segments.getSegmentForTimestamp(0L));
     }
 
     @Test
     public void shouldGetCorrectSegmentString() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context);
+        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
         assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
     }
 
     @Test
     public void shouldCloseAllOpenSegments() {
-        final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, context);
-        final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, context);
-        final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, context);
+        final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, context, -1L);
         segments.close();
 
         assertFalse(first.isOpen());
@@ -154,16 +153,16 @@ public class KeyValueSegmentsTest {
     @Test
     public void shouldOpenExistingSegments() {
         segments = new KeyValueSegments("test", 4, 1);
-        segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
-        segments.getOrCreateSegmentIfLive(2, context);
-        segments.getOrCreateSegmentIfLive(3, context);
-        segments.getOrCreateSegmentIfLive(4, context);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.getOrCreateSegmentIfLive(3, context, -1L);
+        segments.getOrCreateSegmentIfLive(4, context, -1L);
         // close existing.
         segments.close();
 
         segments = new KeyValueSegments("test", 4, 1);
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         assertTrue(segments.getSegmentForTimestamp(0).isOpen());
         assertTrue(segments.getSegmentForTimestamp(1).isOpen());
@@ -178,12 +177,12 @@ public class KeyValueSegmentsTest {
         updateStreamTimeAndCreateSegment(1);
         updateStreamTimeAndCreateSegment(2);
         updateStreamTimeAndCreateSegment(3);
-        updateStreamTimeAndCreateSegment(4);
-        segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
-        segments.getOrCreateSegmentIfLive(2, context);
-        segments.getOrCreateSegmentIfLive(3, context);
-        segments.getOrCreateSegmentIfLive(4, context);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
+        segments.getOrCreateSegmentIfLive(0, context, streamTime);
+        segments.getOrCreateSegmentIfLive(1, context, streamTime);
+        segments.getOrCreateSegmentIfLive(2, context, streamTime);
+        segments.getOrCreateSegmentIfLive(3, context, streamTime);
+        segments.getOrCreateSegmentIfLive(4, context, streamTime);
 
         final List<KeyValueSegment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
@@ -235,17 +234,18 @@ public class KeyValueSegmentsTest {
         verifyCorrectSegments(0, 3);
         updateStreamTimeAndCreateSegment(3);
         verifyCorrectSegments(0, 4);
-        updateStreamTimeAndCreateSegment(4);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
         verifyCorrectSegments(0, 5);
-        segments.getOrCreateSegmentIfLive(5, context);
+        segments.getOrCreateSegmentIfLive(5, context, streamTime);
         verifyCorrectSegments(0, 6);
-        segments.getOrCreateSegmentIfLive(6, context);
+        segments.getOrCreateSegmentIfLive(6, context, streamTime);
         verifyCorrectSegments(0, 7);
     }
 
-    private void updateStreamTimeAndCreateSegment(final int segment) {
-        context.setStreamTime(SEGMENT_INTERVAL * segment);
-        segments.getOrCreateSegmentIfLive(segment, context);
+    private long updateStreamTimeAndCreateSegment(final int segment) {
+        final long streamTime = SEGMENT_INTERVAL * segment;
+        segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+        return streamTime;
     }
 
     @Test
@@ -268,7 +268,7 @@ public class KeyValueSegmentsTest {
             oldSegment.createNewFile();
         }
 
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
             final String segmentName = storeName + "." + (long) segmentId * segmentInterval;
@@ -290,7 +290,7 @@ public class KeyValueSegmentsTest {
             oldSegment.createNewFile();
         }
 
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
             final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
@@ -300,7 +300,7 @@ public class KeyValueSegmentsTest {
 
     @Test
     public void shouldClearSegmentsOnClose() {
-        segments.getOrCreateSegmentIfLive(0, context);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.close();
         assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 8097d74..4c7755f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -81,6 +81,7 @@ public class RocksDBSegmentedBytesStoreTest {
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
     private final Window[] windows = new Window[4];
+    private Window nextSegmentWindow;
 
     @Parameter
     public SegmentedBytesStore.KeySchema schema;
@@ -98,12 +99,22 @@ public class RocksDBSegmentedBytesStoreTest {
             windows[1] = new SessionWindow(500L, 1000L);
             windows[2] = new SessionWindow(1_000L, 1_500L);
             windows[3] = new SessionWindow(30_000L, 60_000L);
+            // All four of the previous windows will go into segment 1.
+            // The nextSegmentWindow is computed be a high enough time that when it gets written
+            // to the segment store, it will advance stream time past the first segment's retention time and
+            // expire it.
+            nextSegmentWindow = new SessionWindow(segmentInterval + retention, segmentInterval + retention);
         }
         if (schema instanceof WindowKeySchema) {
             windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow);
             windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow);
             windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow);
             windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
+            // All four of the previous windows will go into segment 1.
+            // The nextSegmentWindow is computed be a high enough time that when it gets written
+            // to the segment store, it will advance stream time past the first segment's retention time and
+            // expire it.
+            nextSegmentWindow = timeWindowForSize(segmentInterval + retention, windowSizeForTimeWindow);
         }
 
 
@@ -415,8 +426,13 @@ public class RocksDBSegmentedBytesStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
 
-        context.setStreamTime(Math.max(retention, segmentInterval) * 2);
-        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5));
+        // write a record to advance stream time, with a high enough timestamp
+        // that the subsequent record in windows[0] will already be expired.
+        bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0));
+
+        final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+        final byte[] value = serializeValue(5);
+        bytesStore.put(key, value);
 
         LogCaptureAppender.unregister(appender);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 2666f5f..32fa0f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -168,7 +168,6 @@ public class RocksDBWindowStoreTest {
 
     private void setCurrentTime(final long currentTime) {
         context.setRecordContext(createRecordContext(currentTime));
-        context.setStreamTime(currentTime);
     }
 
     private ProcessorRecordContext createRecordContext(final long time) {
@@ -1255,9 +1254,11 @@ public class RocksDBWindowStoreTest {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        context.setStreamTime(segmentInterval * 6L);
         windowStore = createWindowStore(context, false);
 
+        // put something in the store to advance its stream time and expire the old segments
+        windowStore.put(1, "v", 6L * segmentInterval);
+
         final List<String> expected = asList(
             segments.segmentName(4L),
             segments.segmentName(5L),
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index a396ad1..0fae49c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,30 +16,28 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
 import java.util.Properties;
 
 public class SmokeTestClient extends SmokeTestUtil {
@@ -57,21 +55,13 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     public void start() {
         streams = createKafkaStreams(streamsProperties);
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
-                uncaughtException = true;
-                e.printStackTrace();
-            }
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
+            uncaughtException = true;
+            e.printStackTrace();
         });
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                close();
-            }
-        }));
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> close()));
 
         thread = new Thread() {
             public void run() {
@@ -115,84 +105,75 @@ public class SmokeTestClient extends SmokeTestUtil {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
         final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
-        source.to("echo", Produced.with(stringSerde, intSerde));
-        final KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return value == null || value != END;
-            }
-        });
+        source.filterNot((k, v) -> k.equals("flush"))
+              .to("echo", Produced.with(stringSerde, intSerde));
+        final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
         data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
         final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
 
-        groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(1)))
+        final KTable<Windowed<String>, Integer> minAggregation = groupedData
+            .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MAX_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value < aggregate) ? value : aggregate;
-                    }
-                },
-                Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+                () -> Integer.MAX_VALUE,
+                (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
+                Materialized
+                    .<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
+                    .withValueSerde(intSerde)
+                    .withRetention(Duration.ofHours(25))
+            );
+
+        minAggregation
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("min-raw", Produced.with(stringSerde, intSerde));
+
+        minAggregation
+            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("min-suppressed", Produced.with(stringSerde, intSerde));
+
+        minAggregation
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("min", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> minTable = builder.table(
             "min",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("minStoreName"));
+            Materialized.as("minStoreName"));
         minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MIN_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value > aggregate) ? value : aggregate;
-                    }
-                },
+                () -> Integer.MIN_VALUE,
+                (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
                 Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("max", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> maxTable = builder.table(
             "max",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("maxStoreName"));
+            Materialized.as("maxStoreName"));
         maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Long>() {
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<String, Integer, Long>() {
-                    @Override
-                    public Long apply(final String aggKey, final Integer value, final Long aggregate) {
-                        return (long) value + aggregate;
-                    }
-                },
+                () -> 0L,
+                (aggKey, value, aggregate) -> (long) value + aggregate,
                 Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
-            .toStream(new Unwindow<String, Long>())
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("sum", Produced.with(stringSerde, longSerde));
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
@@ -202,38 +183,33 @@ public class SmokeTestClient extends SmokeTestUtil {
         // cnt
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("uwin-cnt"))
-            .toStream(new Unwindow<String, Long>())
+            .count(Materialized.as("uwin-cnt"))
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("cnt", Produced.with(stringSerde, longSerde));
 
         final KTable<String, Long> cntTable = builder.table(
             "cnt",
             Consumed.with(stringSerde, longSerde),
-            Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cntStoreName"));
+            Materialized.as("cntStoreName"));
         cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif
         maxTable
             .join(
                 minTable,
-                new ValueJoiner<Integer, Integer, Integer>() {
-                    public Integer apply(final Integer value1, final Integer value2) {
-                        return value1 - value2;
-                    }
-                })
+                (value1, value2) -> value1 - value2)
             .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
             .to("dif", Produced.with(stringSerde, intSerde));
 
         // avg
         sumTable
             .join(
                 cntTable,
-                new ValueJoiner<Long, Long, Double>() {
-                    public Double apply(final Long value1, final Long value2) {
-                        return (double) value1 / (double) value2;
-                    }
-                })
+                (value1, value2) -> (double) value1 / (double) value2)
             .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
             .to("avg", Produced.with(stringSerde, doubleSerde));
 
         // test repartition
@@ -247,12 +223,9 @@ public class SmokeTestClient extends SmokeTestUtil {
             .to("tagg", Produced.with(stringSerde, longSerde));
 
         final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props));
-        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-                streamsClient.close(Duration.ofSeconds(30));
-            }
+        streamsClient.setUncaughtExceptionHandler((t, e) -> {
+            System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            streamsClient.close(Duration.ofSeconds(30));
         });
 
         return streamsClient;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 078cbe4..2762b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -212,6 +213,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
         }
 
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor("data");
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            ));
+        }
+
         producer.close();
         return Collections.unmodifiableMap(allData);
     }
@@ -260,7 +274,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 
         final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
+        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "min-suppressed", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
@@ -269,6 +283,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final HashMap<String, Integer> max = new HashMap<>();
         final HashMap<String, Integer> min = new HashMap<>();
+        final HashMap<String, List<Integer>> minSuppressed = new HashMap<>();
         final HashMap<String, Integer> dif = new HashMap<>();
         final HashMap<String, Long> sum = new HashMap<>();
         final HashMap<String, Long> cnt = new HashMap<>();
@@ -288,6 +303,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
             final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
+                    && verifyMinSuppressed(minSuppressed, allData, false)
                     && verifyMax(max, allData, false)
                     && verifyDif(dif, allData, false)
                     && verifySum(sum, allData, false)
@@ -314,6 +330,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         case "min":
                             min.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
+                        case "min-suppressed":
+                            minSuppressed.computeIfAbsent(key, k -> new LinkedList<>())
+                                         .add(intSerde.deserializer().deserialize("", record.value()));
+                            break;
                         case "max":
                             max.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
@@ -370,6 +390,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
 
         success &= verifyMin(min, allData, true);
+        success &= verifyMinSuppressed(minSuppressed, allData, true);
         success &= verifyMax(max, allData, true);
         success &= verifyDif(dif, allData, true);
         success &= verifySum(sum, allData, true);
@@ -410,6 +431,41 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return true;
     }
 
+    private static boolean verifyMinSuppressed(final Map<String, List<Integer>> map,
+                                               final Map<String, Set<Integer>> allData,
+                                               final boolean print) {
+        if (map.isEmpty()) {
+            maybePrint(print, "min-suppressed is empty");
+            return false;
+        } else {
+            maybePrint(print, "verifying min-suppressed");
+
+            if (map.size() != allData.size()) {
+                maybePrint(print, "fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                return false;
+            }
+            for (final Map.Entry<String, List<Integer>> entry : map.entrySet()) {
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                final int expected = getMin(unwindowedKey);
+                if (entry.getValue().size() != 1) {
+                    maybePrint(print, "fail: key=" + entry.getKey() + " non-unique value: " + entry.getValue());
+                    return false;
+                } else if (expected != entry.getValue().get(0)) {
+                    maybePrint(print, "fail: key=" + entry.getKey() + " min=" + entry.getValue().get(0) + " expected=" + expected);
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private static void maybePrint(final boolean print, final String s) {
+        if (print) {
+            System.out.println(s);
+        }
+    }
+
     private static boolean verifyMax(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) {
         if (map.isEmpty()) {
             if (print) {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 3b332b5..c9255ce 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.test;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -33,10 +32,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -44,6 +43,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,7 +64,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     private Serde<?> keySerde;
     private Serde<?> valSerde;
     private long timestamp = -1L;
-    private long streamTime = -1L;
 
     public InternalMockProcessorContext() {
         this(null,
@@ -175,15 +174,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     @Override
     public void initialize() {}
 
-    public void setStreamTime(final long currentTime) {
-        streamTime = currentTime;
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
     @Override
     public File stateDir() {
         if (stateDir == null) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 14f8561..62a8491 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
     private ProcessorNode currentNode;
-    private long streamTime;
 
     @Override
     public StreamsMetricsImpl metrics() {
@@ -72,13 +71,4 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     public void uninitialize() {
 
     }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
-    public void setStreamTime(final long streamTime) {
-        this.streamTime = streamTime;
-    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 2282084..c7c8343 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -98,11 +98,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public long streamTime() {
-        throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext");
-    }
-
-    @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
         // no-op


Mime
View raw message