kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7223: Make suppression buffer durable (#5724)
Date Wed, 03 Oct 2018 16:52:22 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b793eaa  KAFKA-7223: Make suppression buffer durable (#5724)
b793eaa is described below

commit b793eaade4fcb0705ab80c2a806331fab8c29f9f
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Oct 3 11:52:13 2018 -0500

    KAFKA-7223: Make suppression buffer durable (#5724)
    
    This is Part 4 of suppression (durability)
    Part 1 was #5567 (the API)
    Part 2 was #5687 (the tests)
    Part 3 was #5693 (in-memory buffering)
    
    Implement a changelog for the suppression buffer so that the buffer state may be recovered on restart or recovery.
    As of this PR, suppression is suitable for general usage.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/kstream/internals/KTableImpl.java      |  16 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 116 -------
 .../suppress/KTableSuppressProcessor.java          |  16 +-
 .../kstream/internals/suppress/TimeKey.java        |  60 ----
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 347 +++++++++++++++++++++
 .../internals}/TimeOrderedKeyValueBuffer.java      |   6 +-
 .../SuppressionDurabilityIntegrationTest.java      | 260 +++++++++++++++
 .../integration/SuppressionIntegrationTest.java    | 224 ++++---------
 .../integration/utils/IntegrationTestUtils.java    | 158 +++++++++-
 .../suppress/KTableSuppressProcessorTest.java      | 209 ++++++-------
 10 files changed, 927 insertions(+), 485 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5cd11d..b9a5e1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcess
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -356,20 +358,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     @Override
     public KTable<K, V> suppress(final Suppressed<K> suppressed) {
         final String name = builder.newProcessorName(SUPPRESS_NAME);
+        final String storeName = builder.newStoreName(SUPPRESS_NAME);
 
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
             () -> new KTableSuppressProcessor<>(
                 buildSuppress(suppressed),
+                storeName,
                 keySerde,
                 valSerde == null ? null : new FullChangeSerde<>(valSerde)
             );
 
-        final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(
-            suppressionSupplier,
-            name
-        );
 
-        final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false);
+        final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(suppressionSupplier, name),
+            null,
+            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
+            false
+        );
 
         builder.addGraphNode(streamsGraphNode, node);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
deleted file mode 100644
index 677a662..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
-    private final Map<Bytes, TimeKey> index = new HashMap<>();
-    private final TreeMap<TimeKey, ContextualRecord> sortedMap = new TreeMap<>();
-    private long memBufferSize = 0L;
-    private long minTimestamp = Long.MAX_VALUE;
-
-    @Override
-    public void evictWhile(final Supplier<Boolean> predicate,
-                           final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
-        final Iterator<Map.Entry<TimeKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
-
-        if (predicate.get()) {
-            Map.Entry<TimeKey, ContextualRecord> next = null;
-            if (delegate.hasNext()) {
-                next = delegate.next();
-            }
-
-            // predicate being true means we read one record, call the callback, and then remove it
-            while (next != null && predicate.get()) {
-                callback.accept(new KeyValue<>(next.getKey().key(), next.getValue()));
-
-                delegate.remove();
-                index.remove(next.getKey().key());
-
-                memBufferSize = memBufferSize - computeRecordSize(next.getKey().key(), next.getValue());
-
-                // peek at the next record so we can update the minTimestamp
-                if (delegate.hasNext()) {
-                    next = delegate.next();
-                    minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time();
-                } else {
-                    next = null;
-                    minTimestamp = Long.MAX_VALUE;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void put(final long time,
-                    final Bytes key,
-                    final ContextualRecord value) {
-        // non-resetting semantics:
-        // if there was a previous version of the same record,
-        // then insert the new record in the same place in the priority queue
-
-        final TimeKey previousKey = index.get(key);
-        if (previousKey == null) {
-            final TimeKey nextKey = new TimeKey(time, key);
-            index.put(key, nextKey);
-            sortedMap.put(nextKey, value);
-            minTimestamp = Math.min(minTimestamp, time);
-            memBufferSize = memBufferSize + computeRecordSize(key, value);
-        } else {
-            final ContextualRecord removedValue = sortedMap.put(previousKey, value);
-            memBufferSize =
-                memBufferSize
-                    + computeRecordSize(key, value)
-                    - (removedValue == null ? 0 : computeRecordSize(key, removedValue));
-        }
-    }
-
-    @Override
-    public int numRecords() {
-        return index.size();
-    }
-
-    @Override
-    public long bufferSize() {
-        return memBufferSize;
-    }
-
-    @Override
-    public long minTimestamp() {
-        return minTimestamp;
-    }
-
-    private long computeRecordSize(final Bytes key, final ContextualRecord value) {
-        long size = 0L;
-        size += 8; // buffer time
-        size += key.get().length;
-        if (value != null) {
-            size += value.sizeBytes();
-        }
-        return size;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 57e5066..62c034d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -28,6 +28,9 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.internals.ContextualRecord;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import java.util.Objects;
 
 import static java.util.Objects.requireNonNull;
 
@@ -35,25 +38,27 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private final long maxRecords;
     private final long maxBytes;
     private final long suppressDurationMillis;
-    private final TimeOrderedKeyValueBuffer buffer;
     private final TimeDefinition<K> bufferTimeDefinition;
     private final BufferFullStrategy bufferFullStrategy;
     private final boolean shouldSuppressTombstones;
+    private final String storeName;
+    private TimeOrderedKeyValueBuffer buffer;
     private InternalProcessorContext internalProcessorContext;
 
     private Serde<K> keySerde;
-    private Serde<Change<V>> valueSerde;
+    private FullChangeSerde<V> valueSerde;
 
     public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
+                                   final String storeName,
                                    final Serde<K> keySerde,
                                    final FullChangeSerde<V> valueSerde) {
+        this.storeName = storeName;
         requireNonNull(suppress);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         maxRecords = suppress.getBufferConfig().maxRecords();
         maxBytes = suppress.getBufferConfig().maxBytes();
         suppressDurationMillis = suppress.getTimeToWaitForMoreEvents().toMillis();
-        buffer = new InMemoryTimeOrderedKeyValueBuffer();
         bufferTimeDefinition = suppress.getTimeDefinition();
         bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy();
         shouldSuppressTombstones = suppress.shouldSuppressTombstones();
@@ -63,8 +68,9 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
-        this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
-        this.valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
+        keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
+        valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
+        buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
deleted file mode 100644
index d3ad350..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.common.utils.Bytes;
-
-import java.util.Objects;
-
-class TimeKey implements Comparable<TimeKey> {
-    private final long time;
-    private final Bytes key;
-
-    TimeKey(final long time, final Bytes key) {
-        this.time = time;
-        this.key = key;
-    }
-
-    Bytes key() {
-        return key;
-    }
-
-    long time() {
-        return time;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final TimeKey timeKey = (TimeKey) o;
-        return time == timeKey.time &&
-            Objects.equals(key, timeKey.key);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(time, key);
-    }
-
-    @Override
-    public int compareTo(final TimeKey o) {
-        // ordering of keys within a time uses hashCode.
-        final int timeComparison = Long.compare(time, o.time);
-        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
new file mode 100644
index 0000000..3ac6fc8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryTimeOrderedKeyValueBuffer.class);
+    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
+    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+
+    private final Map<Bytes, BufferKey> index = new HashMap<>();
+    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap<>();
+
+    private final Set<Bytes> dirtyKeys = new HashSet<>();
+    private final String storeName;
+    private final boolean loggingEnabled;
+
+    private long memBufferSize = 0L;
+    private long minTimestamp = Long.MAX_VALUE;
+    private RecordCollector collector;
+    private String changelogTopic;
+
+    private volatile boolean open;
+
+    public static class Builder implements StoreBuilder<StateStore> {
+
+        private final String storeName;
+        private boolean loggingEnabled = true;
+
+        public Builder(final String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withCachingEnabled() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withCachingDisabled() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withLoggingEnabled(final Map<String, String> config) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withLoggingDisabled() {
+            loggingEnabled = false;
+            return this;
+        }
+
+        @Override
+        public StateStore build() {
+            return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled);
+        }
+
+        @Override
+        public Map<String, String> logConfig() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public boolean loggingEnabled() {
+            return loggingEnabled;
+        }
+
+        @Override
+        public String name() {
+            return storeName;
+        }
+    }
+
+    private static class BufferKey implements Comparable<BufferKey> {
+        private final long time;
+        private final Bytes key;
+
+        private BufferKey(final long time, final Bytes key) {
+            this.time = time;
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final BufferKey bufferKey = (BufferKey) o;
+            return time == bufferKey.time &&
+                Objects.equals(key, bufferKey.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(time, key);
+        }
+
+        @Override
+        public int compareTo(final BufferKey o) {
+            // ordering of keys within a time uses hashCode.
+            final int timeComparison = Long.compare(time, o.time);
+            return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+        }
+    }
+
+    private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) {
+        this.storeName = storeName;
+        this.loggingEnabled = loggingEnabled;
+    }
+
+    @Override
+    public String name() {
+        return storeName;
+    }
+
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
+        if (loggingEnabled) {
+            collector = ((RecordCollector.Supplier) context).recordCollector();
+            changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+        }
+        open = true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public void close() {
+        index.clear();
+        sortedMap.clear();
+        dirtyKeys.clear();
+        memBufferSize = 0;
+        minTimestamp = Long.MAX_VALUE;
+        open = false;
+    }
+
+    @Override
+    public void flush() {
+        if (loggingEnabled) {
+            // counting on this getting called before the record collector's flush
+            for (final Bytes key : dirtyKeys) {
+
+                final BufferKey bufferKey = index.get(key);
+
+                if (bufferKey == null) {
+                    // The record was evicted from the buffer. Send a tombstone.
+                    collector.send(changelogTopic, key, null, null, null, null, KEY_SERIALIZER, VALUE_SERIALIZER);
+                } else {
+                    final ContextualRecord value = sortedMap.get(bufferKey);
+
+                    final byte[] innerValue = value.value();
+                    final byte[] timeAndValue = ByteBuffer.wrap(new byte[8 + innerValue.length])
+                                                          .putLong(bufferKey.time)
+                                                          .put(innerValue)
+                                                          .array();
+
+                    final ProcessorRecordContext recordContext = value.recordContext();
+                    collector.send(
+                        changelogTopic,
+                        key,
+                        timeAndValue,
+                        recordContext.headers(),
+                        recordContext.partition(),
+                        recordContext.timestamp(),
+                        KEY_SERIALIZER,
+                        VALUE_SERIALIZER
+                    );
+                }
+            }
+            dirtyKeys.clear();
+        }
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
+        for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            final Bytes key = Bytes.wrap(record.key());
+            if (record.value() == null) {
+                // This was a tombstone. Delete the record.
+                final BufferKey bufferKey = index.remove(key);
+                if (bufferKey != null) {
+                    sortedMap.remove(bufferKey);
+                }
+            } else {
+                final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+                final long time = timeAndValue.getLong();
+                final byte[] value = new byte[record.value().length - 8];
+                timeAndValue.get(value);
+
+                cleanPut(
+                    time,
+                    key,
+                    new ContextualRecord(
+                        value,
+                        new ProcessorRecordContext(
+                            record.timestamp(),
+                            record.offset(),
+                            record.partition(),
+                            record.topic(),
+                            record.headers()
+                        )
+                    )
+                );
+            }
+        }
+    }
+
+
+    @Override
+    public void evictWhile(final Supplier<Boolean> predicate,
+                           final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
+        final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
+
+        if (predicate.get()) {
+            Map.Entry<BufferKey, ContextualRecord> next = null;
+            if (delegate.hasNext()) {
+                next = delegate.next();
+            }
+
+            // predicate being true means we read one record, call the callback, and then remove it
+            while (next != null && predicate.get()) {
+                callback.accept(new KeyValue<>(next.getKey().key, next.getValue()));
+
+                delegate.remove();
+                index.remove(next.getKey().key);
+
+                dirtyKeys.add(next.getKey().key);
+
+                memBufferSize = memBufferSize - computeRecordSize(next.getKey().key, next.getValue());
+
+                // peek at the next record so we can update the minTimestamp
+                if (delegate.hasNext()) {
+                    next = delegate.next();
+                    minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time;
+                } else {
+                    next = null;
+                    minTimestamp = Long.MAX_VALUE;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void put(final long time,
+                    final Bytes key,
+                    final ContextualRecord value) {
+        cleanPut(time, key, value);
+        dirtyKeys.add(key);
+    }
+
+    private void cleanPut(final long time, final Bytes key, final ContextualRecord value) {
+        // non-resetting semantics:
+        // if there was a previous version of the same record,
+        // then insert the new record in the same place in the priority queue
+
+        final BufferKey previousKey = index.get(key);
+        if (previousKey == null) {
+            final BufferKey nextKey = new BufferKey(time, key);
+            index.put(key, nextKey);
+            sortedMap.put(nextKey, value);
+            minTimestamp = Math.min(minTimestamp, time);
+            memBufferSize = memBufferSize + computeRecordSize(key, value);
+        } else {
+            final ContextualRecord removedValue = sortedMap.put(previousKey, value);
+            memBufferSize =
+                memBufferSize
+                    + computeRecordSize(key, value)
+                    - (removedValue == null ? 0 : computeRecordSize(key, removedValue));
+        }
+    }
+
+    @Override
+    public int numRecords() {
+        return index.size();
+    }
+
+    @Override
+    public long bufferSize() {
+        return memBufferSize;
+    }
+
+    @Override
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    private long computeRecordSize(final Bytes key, final ContextualRecord value) {
+        long size = 0L;
+        size += 8; // buffer time
+        size += key.get().length;
+        if (value != null) {
+            size += value.sizeBytes();
+        }
+        return size;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
similarity index 87%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
rename to streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 98a4f63..86a8c1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -14,16 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.suppress;
+package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
+import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-interface TimeOrderedKeyValueBuffer {
+public interface TimeOrderedKeyValueBuffer extends StateStore {
     void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback);
 
     void put(final long time, final Bytes key, final ContextualRecord value);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
new file mode 100644
index 0000000..93ecc53
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class SuppressionDurabilityIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
+    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+    private static final int COMMIT_INTERVAL = 100;
+    private final boolean eosEnabled;
+
+    public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
+        this.eosEnabled = eosEnabled;
+    }
+
+    @Parameters(name = "{index}: eosEnabled={0}")
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(new Object[] {false}, new Object[] {true});
+    }
+
+    private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
+        return builder
+            .table(
+                input,
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), Grouped.with(STRING_SERDE, STRING_SERDE))
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled());
+    }
+
+    @Test
+    public void shouldRecoverBufferAfterShutdown() {
+        final String testId = "-shouldRecoverBufferAfterShutdown";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
+
+        final KStream<String, Long> suppressedCounts = valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(3L).emitEarlyWhenFull()))
+            .toStream();
+
+        final AtomicInteger eventCount = new AtomicInteger(0);
+        suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
+
+        suppressedCounts
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE)
+        ));
+
+        KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
+        try {
+            // start by putting some stuff in the buffer
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v2", scaledTime(2L)),
+                    new KeyValueTimestamp<>("k3", "v3", scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("v3", 1L, scaledTime(3L))
+                )
+            );
+            assertThat(eventCount.get(), is(0));
+
+            // flush two of the first three events out.
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k4", "v4", scaledTime(4L)),
+                    new KeyValueTimestamp<>("k5", "v5", scaledTime(5L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
+                    new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
+                )
+            );
+            assertThat(eventCount.get(), is(2));
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(2L))
+                )
+            );
+
+            // bounce to ensure that the history, including retractions,
+            // get restored properly. (i.e., we shouldn't see those first events again)
+
+            // restart the driver
+            driver.close();
+            assertThat(driver.state(), is(KafkaStreams.State.NOT_RUNNING));
+            driver = getStartedStreams(streamsConfig, builder, false);
+
+
+            // flush those recovered buffered events out.
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k6", "v6", scaledTime(6L)),
+                    new KeyValueTimestamp<>("k7", "v7", scaledTime(7L)),
+                    new KeyValueTimestamp<>("k8", "v8", scaledTime(8L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v6", 1L, scaledTime(6L)),
+                    new KeyValueTimestamp<>("v7", 1L, scaledTime(7L)),
+                    new KeyValueTimestamp<>("v8", 1L, scaledTime(8L))
+                )
+            );
+            assertThat(eventCount.get(), is(5));
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v3", 1L, scaledTime(3L)),
+                    new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
+                    new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
+                )
+            );
+
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+        final Properties properties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+            )
+        );
+        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
+
+    }
+
+    /**
+     * scaling to ensure that there are commits in between the various test events,
+     * just to exercise that everything works properly in the presence of commits.
+     */
+    private long scaledTime(final long unscaledTime) {
+        return COMMIT_INTERVAL * 2 * unscaledTime;
+    }
+
+    private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+        final Properties producerConfig = mkProperties(mkMap(
+            mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+        ));
+        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index a9920e3..208f1eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -17,16 +17,12 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -38,10 +34,10 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -53,14 +49,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.time.Duration;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
@@ -69,6 +60,9 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
@@ -87,18 +81,17 @@ public class SuppressionIntegrationTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
     private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
     private static final int COMMIT_INTERVAL = 100;
-    private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
     private static final long TIMEOUT_MS = 30_000L;
 
     @Test
-    public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedException {
+    public void shouldSuppressIntermediateEventsWithEmitAfter() {
         final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -112,7 +105,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
 
         try {
             produceSynchronously(
@@ -144,7 +138,7 @@ public class SuppressionIntegrationTest {
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -157,19 +151,19 @@ public class SuppressionIntegrationTest {
                     .withCachingDisabled()
                     .withLoggingDisabled()
             )
-            .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((k, v) -> new KeyValue<>(v, k), Grouped.with(STRING_SERDE, STRING_SERDE))
             .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled());
     }
 
     @Test
-    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException {
+    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
         final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -183,7 +177,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
 
         try {
             produceSynchronously(
@@ -217,19 +212,19 @@ public class SuppressionIntegrationTest {
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSuppressIntermediateEventsWithRecordLimit() throws InterruptedException {
+    public void shouldSuppressIntermediateEventsWithRecordLimit() {
         final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -243,7 +238,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -275,7 +271,7 @@ public class SuppressionIntegrationTest {
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -287,7 +283,7 @@ public class SuppressionIntegrationTest {
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -301,7 +297,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -315,19 +312,19 @@ public class SuppressionIntegrationTest {
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedException {
+    public void shouldSuppressIntermediateEventsWithBytesLimit() {
         final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -342,7 +339,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -374,7 +372,7 @@ public class SuppressionIntegrationTest {
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -386,7 +384,7 @@ public class SuppressionIntegrationTest {
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
@@ -401,7 +399,8 @@ public class SuppressionIntegrationTest {
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -415,26 +414,26 @@ public class SuppressionIntegrationTest {
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSupportFinalResultsForTimeWindows() throws InterruptedException {
+    public void shouldSupportFinalResultsForTimeWindows() {
         final String testId = "-shouldSupportFinalResultsForTimeWindows";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream(input,
                     Consumed.with(STRING_SERDE, STRING_SERDE)
             )
-            .groupBy((String k1, String v1) -> k1, Serialized.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE))
             .windowedBy(TimeWindows.of(scaledTime(2L)).grace(scaledTime(1L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withLoggingDisabled());
 
@@ -449,7 +448,8 @@ public class SuppressionIntegrationTest {
             .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(input, asList(
                 new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
@@ -481,81 +481,40 @@ public class SuppressionIntegrationTest {
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
-    private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
-        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
-    }
-
-    private void cleanStateBeforeTest(final String... topics) throws InterruptedException {
-        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
-        for (final String topic : topics) {
-            CLUSTER.createTopic(topic, 1, 1);
-        }
-    }
-
-    private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) {
-        final Properties streamsConfig = mkProperties(mkMap(
+    private Properties getStreamsConfig(final String appId) {
+        return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-            mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)),
-            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL))
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE)
         ));
-        final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
-        driver.cleanUp();
-        driver.start();
-        return driver;
     }
 
-    private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException {
-        driver.cleanUp();
-        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
+    private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
+        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
     }
 
+    /**
+     * scaling to ensure that there are commits in between the various test events,
+     * just to exercise that everything works properly in the presence of commits.
+     */
     private long scaledTime(final long unscaledTime) {
-        return SCALE_FACTOR * unscaledTime;
+        return COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
     private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
         final Properties producerConfig = mkProperties(mkMap(
             mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
-            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
-            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
             mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
         ));
-        try (final Producer<String, String> producer = new KafkaProducer<>(producerConfig)) {
-            // TODO: test EOS
-            //noinspection ConstantConditions
-            if (false) {
-                producer.initTransactions();
-                producer.beginTransaction();
-            }
-            final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>();
-            for (final KeyValueTimestamp<String, String> record : toProduce) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null)
-                );
-                futures.add(f);
-            }
-
-            // TODO: test EOS
-            //noinspection ConstantConditions
-            if (false) {
-                producer.commitTransaction();
-            } else {
-                producer.flush();
-            }
-
-            for (final Future<RecordMetadata> future : futures) {
-                try {
-                    future.get();
-                } catch (final InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
+        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
     }
 
     private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
@@ -563,69 +522,16 @@ public class SuppressionIntegrationTest {
         assertThat(driver.state(), is(KafkaStreams.State.ERROR));
     }
 
-    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) {
-        final List<ConsumerRecord<String, Long>> results;
-        try {
-            final Properties properties = mkProperties(
-                mkMap(
-                    mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
-                    mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                    mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
-                    mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
-                )
-            );
-            results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size());
-        } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-
-        if (results.size() != expected.size()) {
-            throw new AssertionError(printRecords(results) + " != " + expected);
-        }
-        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
-        for (final ConsumerRecord<String, Long> result : results) {
-            final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
-            try {
-                compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
-            } catch (final AssertionError e) {
-                throw new AssertionError(printRecords(results) + " != " + expected, e);
-            }
-        }
-    }
-
-    private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, final K expectedKey, final V expectedValue, final long expectedTimestamp) {
-        Objects.requireNonNull(record);
-        final K recordKey = record.key();
-        final V recordValue = record.value();
-        final long recordTimestamp = record.timestamp();
-        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
-                                                            " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
-        if (recordKey != null) {
-            if (!recordKey.equals(expectedKey)) {
-                throw error;
-            }
-        } else if (expectedKey != null) {
-            throw error;
-        }
-        if (recordValue != null) {
-            if (!recordValue.equals(expectedValue)) {
-                throw error;
-            }
-        } else if (expectedValue != null) {
-            throw error;
-        }
-        if (recordTimestamp != expectedTimestamp) {
-            throw error;
-        }
-    }
+    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+        final Properties properties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+            )
+        );
+        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
 
-    private <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) {
-        final StringBuilder resultStr = new StringBuilder();
-        resultStr.append("[\n");
-        for (final ConsumerRecord<?, ?> record : result) {
-            resultStr.append("  ").append(record.toString()).append("\n");
-        }
-        resultStr.append("]");
-        return resultStr.toString();
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 985b57f..8bca79f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
+import kafka.api.Request;
+import kafka.server.KafkaServer;
+import kafka.server.MetadataCache;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -33,11 +36,14 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,24 +53,23 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import kafka.api.Request;
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
-import scala.Option;
-
 /**
  * Utility functions to make integration testing more convenient.
  */
 public class IntegrationTestUtils {
 
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
+    private static final long DEFAULT_COMMIT_INTERVAL = 100L;
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /*
@@ -112,6 +117,26 @@ public class IntegrationTestUtils {
         }
     }
 
+    public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) {
+        try {
+            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+            for (final String topic : topics) {
+                cluster.createTopic(topic, 1, 1);
+            }
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
+        driver.cleanUp();
+        try {
+            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * @param topic          Kafka topic to write the data records to
      * @param records        Data records to write to Kafka
@@ -174,15 +199,6 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
-                                                                         final Headers headers,
-                                                                         final Long timestamp)
-        throws ExecutionException, InterruptedException {
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false);
-    }
-
-    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
-                                                                         final Collection<KeyValue<K, V>> records,
-                                                                         final Properties producerConfig,
                                                                          final Long timestamp,
                                                                          final boolean enableTransactions)
         throws ExecutionException, InterruptedException {
@@ -212,7 +228,42 @@ public class IntegrationTestUtils {
             producer.flush();
         }
     }
-    
+
+    public static <V, K> void produceSynchronously(final Properties producerConfig,
+                                                    final boolean eos,
+                                                    final String topic,
+                                                    final List<KeyValueTimestamp<K, V>> toProduce) {
+        try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
+            final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>();
+            for (final KeyValueTimestamp<K, V> record : toProduce) {
+                final Future<RecordMetadata> f = producer.send(
+                    new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null)
+                );
+                futures.add(f);
+            }
+
+            if (eos) {
+                producer.commitTransaction();
+            } else {
+                producer.flush();
+            }
+
+            for (final Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (final InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
     public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                                 final Collection<KeyValue<K, V>> records,
                                                                                 final Properties producerConfig,
@@ -227,7 +278,7 @@ public class IntegrationTestUtils {
                 f.get();
                 producer.abortTransaction();
             }
-        }    
+        }
     }
 
     public static <V> void produceValuesSynchronously(final String topic,
@@ -297,7 +348,7 @@ public class IntegrationTestUtils {
                                                                                   final int expectedNumRecords) throws InterruptedException {
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
-    
+
     /**
      * Wait until enough data (key-value records) has been consumed.
      *
@@ -483,6 +534,70 @@ public class IntegrationTestUtils {
 
     }
 
+    public static void verifyKeyValueTimestamps(final Properties consumerConfig,
+                                                final String topic,
+                                                final List<KeyValueTimestamp<String, Long>> expected) {
+
+        final List<ConsumerRecord<String, Long>> results;
+        try {
+            results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (results.size() != expected.size()) {
+            throw new AssertionError(printRecords(results) + " != " + expected);
+        }
+        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
+        for (final ConsumerRecord<String, Long> result : results) {
+            final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
+            try {
+                compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + expected, e);
+            }
+        }
+    }
+
+    private static <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record,
+                                                        final K expectedKey,
+                                                        final V expectedValue,
+                                                        final long expectedTimestamp) {
+        Objects.requireNonNull(record);
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+                                                            " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    private static <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ConsumerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+
     /**
      * Returns up to `maxMessages` message-values from the topic.
      *
@@ -520,6 +635,15 @@ public class IntegrationTestUtils {
         return consumedValues;
     }
 
+    public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) {
+        final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
+        if (clean) {
+            driver.cleanUp();
+        }
+        driver.start();
+        return driver;
+    }
+
     /**
      * Returns up to `maxMessages` message-values from the topic.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index bb7f49c..002ace2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -60,13 +62,38 @@ public class KTableSuppressProcessorTest {
 
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
+    private static class Harness<K, V> {
+        private final KTableSuppressProcessor<K, V> processor;
+        private final MockInternalProcessorContext context;
+
+
+        Harness(final Suppressed<K> suppressed,
+                final Serde<K> keySerde,
+                final Serde<V> valueSerde) {
+
+            final String storeName = "test-store";
+
+            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+                .withLoggingDisabled()
+                .build();
+            final KTableSuppressProcessor<K, V> processor =
+                new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde));
+
+            final MockInternalProcessorContext context = new MockInternalProcessorContext();
+            buffer.init(context, buffer);
+            processor.init(context);
+
+            this.processor = processor;
+            this.context = context;
+        }
+    }
+
     @Test
     public void zeroTimeLimitShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())), String(), new FullChangeSerde<>(Long()));
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -83,15 +110,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void windowedZeroTimeLimitShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(
-                getImpl(untilTimeLimit(ZERO, unbounded())),
-                timeWindowedSerdeFrom(String.class, 100L),
-                new FullChangeSerde<>(Long())
-            );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ZERO, unbounded()), timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -108,15 +130,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void intermediateSuppressionShouldBufferAndEmitLater() {
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(
-                getImpl(untilTimeLimit(ofMillis(1), unbounded())),
-                String(),
-                new FullChangeSerde<>(Long())
-            );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
@@ -138,14 +155,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            finalResults(ofMillis(1L)),
-            timeWindowedSerdeFrom(String.class, 1L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long windowStart = 99L;
         final long recordTime = 99L;
@@ -184,18 +197,14 @@ public class KTableSuppressProcessorTest {
     /**
      * Testing a special case of final results: that even with a grace period of 0,
      * it will still buffer events and emit only after the end of the window.
-     * As opposed to emitting immediately the way regular suppresion would with a time limit of 0.
+     * As opposed to emitting immediately the way regular suppression would with a time limit of 0.
      */
     @Test
     public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         // note the record is in the past, but the window end is in the future, so we still have to buffer,
         // even though the grace period is 0.
@@ -221,14 +230,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -245,14 +250,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void finalResultsShouldSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -266,14 +267,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void finalResultsShouldSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            sessionWindowedSerdeFrom(String.class),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -287,14 +284,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -311,14 +304,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            sessionWindowedSerdeFrom(String.class),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -335,14 +324,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForKTable() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -359,14 +344,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldEmitWhenOverRecordCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -386,14 +367,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldEmitWhenOverByteCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -413,14 +390,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldShutDownWhenOverRecordCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull())),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -441,14 +414,10 @@ public class KTableSuppressProcessorTest {
 
     @Test
     public void suppressShouldShutDownWhenOverByteCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull())),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);


Mime
View raw message