kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7223: Suppress API with only immediate emit (#5567)
Date Mon, 24 Sep 2018 20:27:48 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 057c530  KAFKA-7223: Suppress API with only immediate emit (#5567)
057c530 is described below

commit 057c5307e0f055b37794453d16f6f43b8c56528c
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Mon Sep 24 15:27:39 2018 -0500

    KAFKA-7223: Suppress API with only immediate emit (#5567)
    
    Part 1 of the suppression API.
    
    * add the DSL suppress method and config objects
    * add the processor, but only in "identity" mode (i.e., it will forward only if the suppression spec says to forward immediately)
    * add tests
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/kstream/KTable.java   |  10 +
 .../apache/kafka/streams/kstream/Suppressed.java   | 160 ++++++++++++
 .../internals/KStreamSessionWindowAggregate.java   |  16 +-
 .../kstream/internals/KStreamWindowAggregate.java  |  12 +-
 .../streams/kstream/internals/KTableImpl.java      |  74 +++++-
 .../internals/graph/GraphGraceSearchUtil.java      |  89 +++++++
 .../internals/suppress/BufferConfigImpl.java       |  54 ++++
 .../internals/suppress/BufferFullStrategy.java     |  23 ++
 .../internals/suppress/EagerBufferConfigImpl.java  |  76 ++++++
 .../suppress/FinalResultsSuppressionBuilder.java   |  58 +++++
 .../suppress/KTableSuppressProcessor.java          |  66 +++++
 .../internals/suppress/StrictBufferConfigImpl.java |  91 +++++++
 .../kstream/internals/suppress/SuppressedImpl.java |  76 ++++++
 .../apache/kafka/streams/KeyValueTimestamp.java    |  46 ++++
 .../integration/SuppressionIntegrationTest.java    | 280 +++++++++++++++++++++
 .../kafka/streams/kstream/SuppressedTest.java      | 112 +++++++++
 .../kstream/internals/SuppressScenarioTest.java    | 183 ++++++++++++++
 .../internals/graph/GraphGraceSearchUtilTest.java  | 241 ++++++++++++++++++
 .../suppress/KTableSuppressProcessorTest.java      | 204 +++++++++++++++
 .../kafka/test/MockInternalProcessorContext.java   |  84 +++++++
 .../streams/processor/MockProcessorContext.java    |   9 +-
 21 files changed, 1944 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index bdd6dc3..293bc6b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -390,6 +390,16 @@ public interface KTable<K, V> {
     <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
     /**
+     * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
+     *
+     * This controls what updates downstream table and stream operations will receive.
+     *
+     * @param suppressed Configuration object determining what, if any, updates to suppress
+     * @return A new KTable with the desired suppression characteristics.
+     */
+    KTable<K, V> suppress(final Suppressed<K> suppressed);
+
+    /**
      * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
      * (with possibly a new type), with default serializers, deserializers, and state store.
      * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
new file mode 100644
index 0000000..7488ef6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+
+import java.time.Duration;
+
+public interface Suppressed<K> {
+
+    /**
+     * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
+     * enforce the time bound and never emit early.
+     */
+    interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {
+
+    }
+
+    interface BufferConfig<BC extends BufferConfig<BC>> {
+        /**
+         * Create a size-constrained buffer in terms of the maximum number of keys it will store.
+         */
+        static BufferConfig<?> maxRecords(final long recordLimit) {
+            return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+        }
+
+        /**
+         * Set a size constraint on the buffer in terms of the maximum number of keys it will store.
+         */
+        BC withMaxRecords(final long recordLimit);
+
+        /**
+         * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
+         */
+        static BufferConfig<?> maxBytes(final long byteLimit) {
+            return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+        }
+
+        /**
+         * Set a size constraint on the buffer, the maximum number of bytes it will use.
+         */
+        BC withMaxBytes(final long byteLimit);
+
+        /**
+         * Create a buffer unconstrained by size (either keys or bytes).
+         *
+         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
+         *
+         * If there isn't enough heap available to meet the demand, the application will encounter an
+         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
+         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
+         *
+         * This is a convenient option if you doubt that your buffer will be that large, but also don't
+         * wish to pick particular constraints, such as in testing.
+         *
+         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
+         * It will never emit early.
+         */
+        static StrictBufferConfig unbounded() {
+            return new StrictBufferConfigImpl();
+        }
+
+        /**
+         * Set the buffer to be unconstrained by size (either keys or bytes).
+         *
+         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
+         *
+         * If there isn't enough heap available to meet the demand, the application will encounter an
+         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
+         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
+         *
+         * This is a convenient option if you doubt that your buffer will be that large, but also don't
+         * wish to pick particular constraints, such as in testing.
+         *
+         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
+         * It will never emit early.
+         */
+        StrictBufferConfig withNoBound();
+
+        /**
+         * Set the buffer to gracefully shut down the application when any of its constraints are violated
+         *
+         * This buffer is "strict" in the sense that it will enforce the time bound or shut down.
+         * It will never emit early.
+         */
+        StrictBufferConfig shutDownWhenFull();
+
+        /**
+         * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
+         *
+         * This buffer is "strict" in the sense that it will never emit early.
+         */
+        StrictBufferConfig spillToDiskWhenFull();
+
+        /**
+         * Set the buffer to just emit the oldest records when any of its constraints are violated.
+         *
+         * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
+         * duplicate results downstream, but does not promise to eliminate them.
+         */
+        BufferConfig emitEarlyWhenFull();
+    }
+
+    /**
+     * Configure the suppression to emit only the "final results" from the window.
+     *
+     * By default all Streams operators emit results whenever new results are available.
+     * This includes windowed operations.
+     *
+     * This configuration will instead emit just one result per key for each window, guaranteeing
+     * to deliver only the final result. This option is suitable for use cases in which the business logic
+     * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
+     *
+     * To accomplish this, the operator will buffer events from the window until the window close (that is,
+     * until the end-time passes, and additionally until the grace period expires). Since windowed operators
+     * are required to reject late events for a window whose grace period is expired, there is an additional
+     * guarantee that the final results emitted from this suppression will match any queriable state upstream.
+     *
+     * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
+     *                     This is required to be a "strict" config, since it would violate the "final results"
+     *                     property to emit early and then issue an update later.
+     * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
+     *           on Windowed KTables (this is enforced by the type parameter).
+     * @return a "final results" mode suppression configuration
+     */
+    static <K extends Windowed> Suppressed<K> untilWindowCloses(final StrictBufferConfig bufferConfig) {
+        return new FinalResultsSuppressionBuilder<>(bufferConfig);
+    }
+
+    /**
+     * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
+     * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
+     * the first record in the buffer but does <em>not</em> re-start the timer.
+     *
+     * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
+     * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
+     * @param <K> The key type for the KTable to apply this suppression to.
+     * @return a suppression configuration
+     */
+    static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
+        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 5a3c897..b89399b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
     private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
@@ -49,11 +49,11 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
 
     private boolean sendOldValues = false;
 
-    KStreamSessionWindowAggregate(final SessionWindows windows,
-                                  final String storeName,
-                                  final Initializer<Agg> initializer,
-                                  final Aggregator<? super K, ? super V, Agg> aggregator,
-                                  final Merger<? super K, Agg> sessionMerger) {
+    public KStreamSessionWindowAggregate(final SessionWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super V, Agg> aggregator,
+                                         final Merger<? super K, Agg> sessionMerger) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -66,6 +66,10 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
         return new KStreamSessionWindowAggregateProcessor();
     }
 
+    public SessionWindows windows() {
+        return windows;
+    }
+
     @Override
     public void enableSendingOldValues() {
         sendOldValues = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 5754284..f292515 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -44,10 +44,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
 
     private boolean sendOldValues = false;
 
-    KStreamWindowAggregate(final Windows<W> windows,
-                           final String storeName,
-                           final Initializer<Agg> initializer,
-                           final Aggregator<? super K, ? super V, Agg> aggregator) {
+    public KStreamWindowAggregate(final Windows<W> windows,
+                                  final String storeName,
+                                  final Initializer<Agg> initializer,
+                                  final Aggregator<? super K, ? super V, Agg> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -59,6 +59,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         return new KStreamWindowAggregateProcessor();
     }
 
+    public Windows<W> windows() {
+        return windows;
+    }
+
     @Override
     public void enableSendingOldValues() {
         sendOldValues = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 352e42d..2330fad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -26,21 +26,30 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
+import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.time.Duration;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
+import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
+
 /**
  * The implementation class of {@link KTable}.
  *
@@ -66,6 +75,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String SELECT_NAME = "KTABLE-SELECT-";
 
+    private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
+
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
     private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
@@ -350,6 +361,53 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
+    public KTable<K, V> suppress(final Suppressed<K> suppressed) {
+        final String name = builder.newProcessorName(SUPPRESS_NAME);
+
+        final ProcessorSupplier<K, Change<V>> suppressionSupplier =
+            () -> new KTableSuppressProcessor<>(buildSuppress(suppressed));
+
+        final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(
+            suppressionSupplier,
+            name
+        );
+
+        final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false);
+
+        builder.addGraphNode(streamsGraphNode, node);
+
+        return new KTableImpl<K, S, V>(
+            builder,
+            name,
+            suppressionSupplier,
+            keySerde,
+            valSerde,
+            Collections.singleton(this.name),
+            null,
+            false,
+            node
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) {
+        if (suppress instanceof FinalResultsSuppressionBuilder) {
+            final long grace = findAndVerifyWindowGrace(streamsGraphNode);
+
+            final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder) suppress;
+
+            final SuppressedImpl<? extends Windowed> finalResultsSuppression =
+                builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
+
+            return (SuppressedImpl<K>) finalResultsSuppression;
+        } else if (suppress instanceof SuppressedImpl) {
+            return (SuppressedImpl<K>) suppress;
+        } else {
+            throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
+        }
+    }
+
+    @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return doJoin(other, joiner, null, false, false);
@@ -492,12 +550,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
 
         kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
-            .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
-            .withJoinThisProcessorParameters(joinThisProcessorParameters)
-            .withJoinThisStoreNames(valueGetterSupplier().storeNames())
-            .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
-            .withOtherJoinSideNodeName(((KTableImpl) other).name)
-            .withThisJoinSideNodeName(name);
+                             .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+                             .withJoinThisProcessorParameters(joinThisProcessorParameters)
+                             .withJoinThisStoreNames(valueGetterSupplier().storeNames())
+                             .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
+                             .withOtherJoinSideNodeName(((KTableImpl) other).name)
+                             .withThisJoinSideNodeName(name);
 
         final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
@@ -526,10 +584,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final String selectName = builder.newProcessorName(SELECT_NAME);
 
         final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
-        final ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
+        final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
-        final ProcessorGraphNode<K1, V1> groupByMapNode = new ProcessorGraphNode<>(
+        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(
             selectName,
             processorParameters,
             false
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
new file mode 100644
index 0000000..306ddf5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public final class GraphGraceSearchUtil {
+    private GraphGraceSearchUtil() {}
+
+    public static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode) {
+        return findAndVerifyWindowGrace(streamsGraphNode, "");
+    }
+
+    private static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode, final String chain) {
+        // error base case: we traversed off the end of the graph without finding a window definition
+        if (streamsGraphNode == null) {
+            throw new TopologyException(
+                "Window close time is only defined for windowed computations. Got [" + chain + "]."
+            );
+        }
+        // base case: return if this node defines a grace period.
+        {
+            final Long gracePeriod = extractGracePeriod(streamsGraphNode);
+            if (gracePeriod != null) {
+                return gracePeriod;
+            }
+        }
+
+        final String newChain = chain.equals("") ? streamsGraphNode.nodeName() : streamsGraphNode.nodeName() + "->" + chain;
+
+        if (streamsGraphNode.parentNodes().isEmpty()) {
+            // error base case: we traversed to the end of the graph without finding a window definition
+            throw new TopologyException(
+                "Window close time is only defined for windowed computations. Got [" + newChain + "]."
+            );
+        }
+
+        // recursive case: all parents must define a grace period, and we use the max of our parents' graces.
+        long inheritedGrace = -1;
+        for (final StreamsGraphNode parentNode : streamsGraphNode.parentNodes()) {
+            final long parentGrace = findAndVerifyWindowGrace(parentNode, newChain);
+            inheritedGrace = Math.max(inheritedGrace, parentGrace);
+        }
+
+        if (inheritedGrace == -1) {
+            throw new IllegalStateException(); // shouldn't happen, and it's not a legal grace period
+        }
+
+        return inheritedGrace;
+    }
+
+    private static Long extractGracePeriod(final StreamsGraphNode node) {
+        if (node instanceof StatefulProcessorNode) {
+            final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier();
+            if (processorSupplier instanceof KStreamWindowAggregate) {
+                final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier;
+                final Windows windows = kStreamWindowAggregate.windows();
+                return windows.gracePeriodMs();
+            } else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
+                final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
+                final SessionWindows windows = kStreamSessionWindowAggregate.windows();
+                return windows.gracePeriodMs();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
new file mode 100644
index 0000000..e731dc6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+
+abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
+    public abstract long maxKeys();
+
+    public abstract long maxBytes();
+
+    @SuppressWarnings("unused")
+    public abstract BufferFullStrategy bufferFullStrategy();
+
+    @Override
+    public Suppressed.StrictBufferConfig withNoBound() {
+        return new StrictBufferConfigImpl(
+            Long.MAX_VALUE,
+            Long.MAX_VALUE,
+            SHUT_DOWN // doesn't matter, given the bounds
+        );
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig shutDownWhenFull() {
+        return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN);
+    }
+
+    @Override
+    public Suppressed.BufferConfig emitEarlyWhenFull() {
+        return new EagerBufferConfigImpl(maxKeys(), maxBytes());
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig spillToDiskWhenFull() {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java
new file mode 100644
index 0000000..2da7c14
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+public enum BufferFullStrategy {
+    EMIT,
+    SPILL_TO_DISK,
+    SHUT_DOWN
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
new file mode 100644
index 0000000..0c2c883
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import java.util.Objects;
+
+public class EagerBufferConfigImpl extends BufferConfigImpl {
+
+    private final long maxKeys;
+    private final long maxBytes;
+
+    public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
+        this.maxKeys = maxKeys;
+        this.maxBytes = maxBytes;
+    }
+
+    @Override
+    public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
+        return new EagerBufferConfigImpl(recordLimit, maxBytes);
+    }
+
+    @Override
+    public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
+        return new EagerBufferConfigImpl(maxKeys, byteLimit);
+    }
+
+    @Override
+    public long maxKeys() {
+        return maxKeys;
+    }
+
+    @Override
+    public long maxBytes() {
+        return maxBytes;
+    }
+
+    @Override
+    public BufferFullStrategy bufferFullStrategy() {
+        return BufferFullStrategy.EMIT;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
+        return maxKeys == that.maxKeys &&
+            maxBytes == that.maxBytes;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxKeys, maxBytes);
+    }
+
+    @Override
+    public String toString() {
+        return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
new file mode 100644
index 0000000..548f599
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K> {
+    private final StrictBufferConfig bufferConfig;
+
+    public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig bufferConfig) {
+        this.bufferConfig = bufferConfig;
+    }
+
+    public SuppressedImpl<K> buildFinalResultsSuppression(final Duration gracePeriod) {
+        return new SuppressedImpl<>(
+            gracePeriod,
+            bufferConfig,
+            (ProcessorContext context, K key) -> key.window().end()
+        );
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final FinalResultsSuppressionBuilder<?> that = (FinalResultsSuppressionBuilder<?>) o;
+        return Objects.equals(bufferConfig, that.bufferConfig);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bufferConfig);
+    }
+
+    @Override
+    public String toString() {
+        return "FinalResultsSuppressionBuilder{bufferConfig=" + bufferConfig + '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
new file mode 100644
index 0000000..f65f2b4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+
+import java.time.Duration;
+
+public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
+    private final SuppressedImpl<K> suppress;
+    private InternalProcessorContext internalProcessorContext;
+
+    public KTableSuppressProcessor(final SuppressedImpl<K> suppress) {
+        this.suppress = suppress;
+    }
+
+    @Override
+    public void init(final ProcessorContext context) {
+        internalProcessorContext = (InternalProcessorContext) context;
+    }
+
+    @Override
+    public void process(final K key, final Change<V> value) {
+        if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) {
+            internalProcessorContext.forward(key, value);
+        } else {
+            throw new NotImplementedException();
+        }
+    }
+
+    private long definedRecordTime(final K key) {
+        return suppress.getTimeDefinition().time(internalProcessorContext, key);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public String toString() {
+        return "KTableSuppressProcessor{suppress=" + suppress + '}';
+    }
+
+    static class NotImplementedException extends RuntimeException {
+        NotImplementedException() {
+            super();
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
new file mode 100644
index 0000000..0634a74
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import java.util.Objects;
+
+import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+
+public class StrictBufferConfigImpl extends BufferConfigImpl<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig {
+
+    private final long maxKeys;
+    private final long maxBytes;
+    private final BufferFullStrategy bufferFullStrategy;
+
+    public StrictBufferConfigImpl(final long maxKeys,
+                                  final long maxBytes,
+                                  final BufferFullStrategy bufferFullStrategy) {
+        this.maxKeys = maxKeys;
+        this.maxBytes = maxBytes;
+        this.bufferFullStrategy = bufferFullStrategy;
+    }
+
+    public StrictBufferConfigImpl() {
+        this.maxKeys = Long.MAX_VALUE;
+        this.maxBytes = Long.MAX_VALUE;
+        this.bufferFullStrategy = SHUT_DOWN;
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
+        return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
+        return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy);
+    }
+
+    @Override
+    public long maxKeys() {
+        return maxKeys;
+    }
+
+    @Override
+    public long maxBytes() {
+        return maxBytes;
+    }
+
+    @Override
+    public BufferFullStrategy bufferFullStrategy() {
+        return bufferFullStrategy;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
+        return maxKeys == that.maxKeys &&
+            maxBytes == that.maxBytes &&
+            bufferFullStrategy == that.bufferFullStrategy;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxKeys, maxBytes, bufferFullStrategy);
+    }
+
+    @Override
+    public String toString() {
+        return "StrictBufferConfigImpl{maxKeys=" + maxKeys +
+            ", maxBytes=" + maxBytes +
+            ", bufferFullStrategy=" + bufferFullStrategy + '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
new file mode 100644
index 0000000..cffc42b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class SuppressedImpl<K> implements Suppressed<K> {
+    private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE);
+    private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded();
+
+    private final BufferConfig bufferConfig;
+    private final Duration timeToWaitForMoreEvents;
+    private final TimeDefinition<K> timeDefinition;
+
+    public SuppressedImpl(final Duration suppressionTime,
+                          final BufferConfig bufferConfig,
+                          final TimeDefinition<K> timeDefinition) {
+        this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
+        this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition;
+        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig;
+    }
+
+    interface TimeDefinition<K> {
+        long time(final ProcessorContext context, final K key);
+    }
+
+    TimeDefinition<K> getTimeDefinition() {
+        return timeDefinition;
+    }
+
+    Duration getTimeToWaitForMoreEvents() {
+        return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final SuppressedImpl<?> that = (SuppressedImpl<?>) o;
+        return Objects.equals(bufferConfig, that.bufferConfig) &&
+            Objects.equals(getTimeToWaitForMoreEvents(), that.getTimeToWaitForMoreEvents()) &&
+            Objects.equals(getTimeDefinition(), that.getTimeDefinition());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bufferConfig, getTimeToWaitForMoreEvents(), getTimeDefinition());
+    }
+
+    @Override
+    public String toString() {
+        return "SuppressedImpl{" +
+            ", bufferConfig=" + bufferConfig +
+            ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
+            ", timeDefinition=" + timeDefinition +
+            '}';
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
new file mode 100644
index 0000000..4213112
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+public class KeyValueTimestamp<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+
+    public KeyValueTimestamp(final K key, final V value, final long timestamp) {
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    public K key() {
+        return key;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "KeyValueTimestamp{key=" + key + ", value=" + value + ", timestamp=" + timestamp + '}';
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
new file mode 100644
index 0000000..a0e7858
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+
+@Category({IntegrationTest.class})
+public class SuppressionIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+    private static final int COMMIT_INTERVAL = 100;
+    private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
+
+    @Test
+    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException {
+        final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                input,
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled());
+
+        valueCounts
+            .suppress(untilTimeLimit(Duration.ZERO, unbounded()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(4L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
+                )
+            );
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    private void cleanStateBeforeTest(final String... topic) throws InterruptedException {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+        for (final String s : topic) {
+            CLUSTER.createTopic(s, 1, 1);
+        }
+    }
+
+    private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) {
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL))
+        ));
+        final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
+        driver.cleanUp();
+        driver.start();
+        return driver;
+    }
+
+    private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException {
+        driver.cleanUp();
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    private long scaledTime(final long unscaledTime) {
+        return SCALE_FACTOR * unscaledTime;
+    }
+
+    private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+        final Properties producerConfig = mkProperties(mkMap(
+            mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+        ));
+        try (final Producer<String, String> producer = new KafkaProducer<>(producerConfig)) {
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
+            final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>();
+            for (final KeyValueTimestamp<String, String> record : toProduce) {
+                final Future<RecordMetadata> f = producer.send(
+                    new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null)
+                );
+                futures.add(f);
+            }
+            for (final Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (final InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.commitTransaction();
+            } else {
+                producer.flush();
+            }
+        }
+    }
+
+    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) {
+        final List<ConsumerRecord<String, Long>> results;
+        try {
+            final Properties properties = mkProperties(
+                mkMap(
+                    mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                    mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                    mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                    mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+                )
+            );
+            results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size());
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (results.size() != expected.size()) {
+            throw new AssertionError(printRecords(results) + " != " + expected);
+        }
+        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
+        for (final ConsumerRecord<String, Long> result : results) {
+            final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
+            try {
+                compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + expected, e);
+            }
+        }
+    }
+
+    private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, final K expectedKey, final V expectedValue, final long expectedTimestamp) {
+        Objects.requireNonNull(record);
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+                                                            " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    private <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ConsumerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
new file mode 100644
index 0000000..53f24b5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.junit.Test;
+
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SuppressedTest {
+
+    @Test
+    public void bufferBuilderShouldBeConsistent() {
+        assertThat(
+            "noBound should remove bounds",
+            maxBytes(2L).withMaxRecords(4L).withNoBound(),
+            is(unbounded())
+        );
+
+        assertThat(
+            "keys alone should be set",
+            maxRecords(2L),
+            is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+        );
+
+        assertThat(
+            "size alone should be set",
+            maxBytes(2L),
+            is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
+        );
+    }
+
+    @Test
+    public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
+        assertThat(
+            "time alone should be set",
+            untilTimeLimit(ofMillis(2), unbounded()),
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+        );
+
+        assertThat(
+            "time and unbounded buffer should be set",
+            untilTimeLimit(ofMillis(2), unbounded()),
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+        );
+
+        assertThat(
+            "time and keys buffer should be set",
+            untilTimeLimit(ofMillis(2), maxRecords(2)),
+            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null))
+        );
+
+        assertThat(
+            "time and size buffer should be set",
+            untilTimeLimit(ofMillis(2), maxBytes(2)),
+            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null))
+        );
+
+        assertThat(
+            "all constraints should be set",
+            untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
+            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null))
+        );
+    }
+
+    @Test
+    public void finalEventsShouldAcceptStrictBuffersAndSetBounds() {
+
+        assertThat(
+            untilWindowCloses(unbounded()),
+            is(new FinalResultsSuppressionBuilder<>(unbounded()))
+        );
+
+        assertThat(
+            untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
+            is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+            )
+        );
+
+        assertThat(
+            untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
+            is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+            )
+        );
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
new file mode 100644
index 0000000..fead678
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import static java.time.Duration.ZERO;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+
+public class SuppressScenarioTest {
+    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+
+    @Test
+    public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                "input",
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count();
+
+        valueCounts
+            .suppress(untilTimeLimit(ZERO, unbounded()))
+            .toStream()
+            .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final Topology topology = builder.build();
+
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
+            verify(
+                drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 3L));
+            verify(
+                drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 4L));
+            verify(
+                drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("x", 0L, 4L),
+                    new KeyValueTimestamp<>("x", 1L, 4L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("x", 0L, 4L),
+                    new KeyValueTimestamp<>("x", 1L, 4L)
+                )
+            );
+        }
+    }
+
+    private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
+        if (results.size() != expectedResults.size()) {
+            throw new AssertionError(printRecords(results) + " != " + expectedResults);
+        }
+        final Iterator<KeyValueTimestamp<K, V>> expectedIterator = expectedResults.iterator();
+        for (final ProducerRecord<K, V> result : results) {
+            final KeyValueTimestamp<K, V> expected = expectedIterator.next();
+            try {
+                OutputVerifier.compareKeyValueTimestamp(result, expected.key(), expected.value(), expected.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + expectedResults, e);
+            }
+        }
+    }
+
+    private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
+        final List<ProducerRecord<K, V>> result = new LinkedList<>();
+        for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
+             next != null;
+             next = driver.readOutput(topic, keyDeserializer, valueDeserializer)) {
+            result.add(next);
+        }
+        return new ArrayList<>(result);
+    }
+
+    private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ProducerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
new file mode 100644
index 0000000..2b05423
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class GraphGraceSearchUtilTest {
+    @Test
+    public void shouldThrowOnNull() {
+        try {
+            GraphGraceSearchUtil.findAndVerifyWindowGrace(null);
+            fail("Should have thrown.");
+        } catch (final TopologyException e) {
+            assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got []."));
+        }
+    }
+
+    @Test
+    public void shouldFailIfThereIsNoGraceAncestor() {
+        // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is
+        // no grace period defined on any ancestor of the node
+        final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
+            "stateful",
+            new ProcessorParameters<>(
+                () -> new Processor<Object, Object>() {
+                    @Override
+                    public void init(final ProcessorContext context) {}
+
+                    @Override
+                    public void process(final Object key, final Object value) {}
+
+                    @Override
+                    public void close() {}
+                },
+                "dummy"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
+        gracelessAncestor.addChild(node);
+
+        try {
+            GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+            fail("should have thrown.");
+        } catch (final TopologyException e) {
+            assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless]."));
+        }
+    }
+
+    @Test
+    public void shouldExtractGraceFromKStreamWindowAggregateNode() {
+        final TimeWindows windows = TimeWindows.of(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
+                    windows,
+                    "asdf",
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+
+        final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    windows,
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromAncestorThroughStatefulParent() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
+                windows, "asdf", null, null, null
+            ), "asdf"),
+            null,
+            null,
+            false
+        );
+
+        final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
+            "stateful",
+            new ProcessorParameters<>(
+                () -> new Processor<Object, Object>() {
+                    @Override
+                    public void init(final ProcessorContext context) {}
+
+                    @Override
+                    public void process(final Object key, final Object value) {}
+
+                    @Override
+                    public void close() {}
+                },
+                "dummy"
+            ),
+            null,
+            null,
+            false
+        );
+        graceGrandparent.addChild(statefulParent);
+
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
+        statefulParent.addChild(node);
+
+        final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromAncestorThroughStatelessParent() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    windows,
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null);
+        graceGrandparent.addChild(statelessParent);
+
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
+        statelessParent.addChild(node);
+
+        final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
+        final StatefulProcessorNode<String, Long> leftParent = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    SessionWindows.with(10L).grace(1234L),
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
+                    TimeWindows.of(10L).grace(4321L),
+                    "asdf",
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
+        leftParent.addChild(node);
+        rightParent.addChild(node);
+
+        final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(4321L));
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
new file mode 100644
index 0000000..4660333
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static java.time.Duration.ZERO;
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("PointlessArithmeticExpression")
+public class KTableSuppressProcessorTest {
+    /**
+     * Use this value to indicate that the test correctness does not depend on any particular number
+     */
+    private static final long ARBITRARY_LONG = 5L;
+
+    /**
+     * Use this value to indicate that the test correctness does not depend on any particular window
+     */
+    private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L);
+
+    @Test
+    public void zeroTimeLimitShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<String, Long> processor =
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = ARBITRARY_LONG;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void windowedZeroTimeLimitShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = ARBITRARY_LONG;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void intermediateSuppressionShouldThrow() {
+        final KTableSuppressProcessor<String, Long> processor =
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), unbounded())));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        try {
+            processor.process("hey", new Change<>(null, 1L));
+            fail("expected an exception for now");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration grace) {
+        return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
+    }
+
+
+    @Test
+    public void finalResultsSuppressionShouldThrow() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(1)));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        context.setTimestamp(ARBITRARY_LONG);
+        try {
+            processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new Change<>(ARBITRARY_LONG, ARBITRARY_LONG));
+            fail("expected an exception for now");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @Test
+    public void finalResultsWith0GraceBeforeWindowEndShouldThrow() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 5L;
+        context.setTimestamp(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
+        try {
+            processor.process(key, value);
+            fail("expected an exception");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @Test
+    public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    private static <E> Matcher<Collection<E>> hasSize(final int i) {
+        return new BaseMatcher<Collection<E>>() {
+            @Override
+            public void describeTo(final Description description) {
+                description.appendText("a collection of size " + i);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public boolean matches(final Object item) {
+                if (item == null) {
+                    return false;
+                } else {
+                    return ((Collection<E>) item).size() == i;
+                }
+            }
+
+        };
+    }
+
+    private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> suppressed) {
+        return (SuppressedImpl<K>) suppressed;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
new file mode 100644
index 0000000..14f8561
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
+    private ProcessorNode currentNode;
+    private long streamTime;
+
+    @Override
+    public StreamsMetricsImpl metrics() {
+        return (StreamsMetricsImpl) super.metrics();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        setRecordMetadata(
+            recordContext.topic(),
+            recordContext.partition(),
+            recordContext.offset(),
+            recordContext.headers(),
+            recordContext.timestamp()
+        );
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode = currentNode;
+    }
+
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
+    }
+
+    @Override
+    public ThreadCache getCache() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void uninitialize() {
+
+    }
+
+    @Override
+    public long streamTime() {
+        return streamTime;
+    }
+
+    public void setStreamTime(final long streamTime) {
+        this.streamTime = streamTime;
+    }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index cba0257..dc854b0 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -405,13 +405,18 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key, final V value) {
-        capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value)));
+        forward(key, value, To.all());
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
+        capturedForwards.add(
+            new CapturedForward(
+                to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to,
+                new KeyValue(key, value)
+            )
+        );
     }
 
     @SuppressWarnings("deprecation")


Mime
View raw message