kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8199: Implement ValueGetter for Suppress (#6781)
Date Thu, 30 May 2019 22:42:17 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new efa6410  KAFKA-8199: Implement ValueGetter for Suppress (#6781)
efa6410 is described below

commit efa6410611aa0862065ad804323c280a4d8a372d
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu May 30 17:42:04 2019 -0500

    KAFKA-8199: Implement ValueGetter for Suppress (#6781)
    
    See also #6684
    
    KTable processors must be supplied with a KTableProcessorSupplier, which in turn requires implementing a ValueGetter, for use with joins and groupings.
    
    For suppression, a correct view only includes the previously emitted values (not the currently buffered ones), so this change also involves pushing the Change value type into the suppression buffer's interface, so that it can get the prior value upon first buffering (which is also the previously emitted value).
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../streams/kstream/internals/FullChangeSerde.java |  19 +-
 .../streams/kstream/internals/KTableImpl.java      |  17 +-
 .../internals/KTableSourceValueGetterSupplier.java |   2 +-
 .../suppress/KTableSuppressProcessor.java          | 133 ---------
 .../suppress/KTableSuppressProcessorSupplier.java  | 214 +++++++++++++++
 .../kafka/streams/state/internals/BufferKey.java   |  72 +++++
 .../kafka/streams/state/internals/BufferValue.java |  98 +++++++
 .../streams/state/internals/ContextualRecord.java  |   6 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 235 ++++++++--------
 .../kafka/streams/state/internals/Maybe.java       |  89 ++++++
 .../state/internals/TimeOrderedKeyValueBuffer.java |  13 +-
 .../kstream/internals/SuppressScenarioTest.java    | 198 ++++++++++++++
 .../KTableSuppressProcessorMetricsTest.java        |  25 +-
 .../suppress/KTableSuppressProcessorTest.java      |  71 +++--
 .../kstream/internals/suppress/SuppressSuite.java  |  50 ++++
 .../kafka/streams/state/internals/MaybeTest.java   |  79 ++++++
 .../internals/TimeOrderedKeyValueBufferTest.java   | 300 +++++++++++++++++----
 17 files changed, 1265 insertions(+), 356 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index f06a428..30d55be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -43,6 +43,10 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
         this.inner = requireNonNull(inner);
     }
 
+    public Serde<T> innerSerde() {
+        return inner;
+    }
+
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
         inner.configure(configs, isKey);
@@ -110,11 +114,7 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
                 }
                 final ByteBuffer buffer = ByteBuffer.wrap(data);
 
-                final int oldSize = buffer.getInt();
-                final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
-                if (oldBytes != null) {
-                    buffer.get(oldBytes);
-                }
+                final byte[] oldBytes = extractOldValuePart(buffer);
                 final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes);
 
                 final int newSize = buffer.getInt();
@@ -132,4 +132,13 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
             }
         };
     }
+
+    public static byte[] extractOldValuePart(final ByteBuffer buffer) {
+        final int oldSize = buffer.getInt();
+        final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
+        if (oldBytes != null) {
+            buffer.get(oldBytes);
+        }
+        return oldBytes;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3a4994f..666a109 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -38,7 +38,7 @@ import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
-import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
+import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -391,7 +391,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
         final String name;
         if (suppressed instanceof NamedSuppressed) {
-            final String givenName = ((NamedSuppressed) suppressed).name();
+            final String givenName = ((NamedSuppressed<?>) suppressed).name();
             name = givenName != null ? givenName : builder.newProcessorName(SUPPRESS_NAME);
         } else {
             throw new IllegalArgumentException("Custom subclasses of Suppressed are not supported.");
@@ -402,14 +402,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final String storeName =
             suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
 
-        final ProcessorSupplier<K, Change<V>> suppressionSupplier =
-            () -> new KTableSuppressProcessor<>(suppressedInternal, storeName);
+        final ProcessorSupplier<K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
+            suppressedInternal,
+            storeName,
+            this
+        );
 
 
         final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde))
+            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde)
         );
 
         builder.addGraphNode(streamsGraphNode, node);
@@ -624,7 +627,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+    public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
             final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
             // whenever a source ktable is required for getter, it should be materialized
@@ -638,7 +641,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    void enableSendingOldValues() {
+    public void enableSendingOldValues() {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 21731e9..7083b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -37,7 +37,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
     }
 
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
-        TimestampedKeyValueStore<K, V> store = null;
+        private TimestampedKeyValueStore<K, V> store = null;
 
         @SuppressWarnings("unchecked")
         public void init(final ProcessorContext context) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
deleted file mode 100644
index 7184d7a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
-import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
-import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
-
-import static java.util.Objects.requireNonNull;
-
-public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
-    private final long maxRecords;
-    private final long maxBytes;
-    private final long suppressDurationMillis;
-    private final TimeDefinition<K> bufferTimeDefinition;
-    private final BufferFullStrategy bufferFullStrategy;
-    private final boolean safeToDropTombstones;
-    private final String storeName;
-
-    private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
-    private InternalProcessorContext internalProcessorContext;
-    private Sensor suppressionEmitSensor;
-    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
-
-    public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
-        this.storeName = storeName;
-        requireNonNull(suppress);
-        maxRecords = suppress.bufferConfig().maxRecords();
-        maxBytes = suppress.bufferConfig().maxBytes();
-        suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
-        bufferTimeDefinition = suppress.timeDefinition();
-        bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
-        safeToDropTombstones = suppress.safeToDropTombstones();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(final ProcessorContext context) {
-        internalProcessorContext = (InternalProcessorContext) context;
-        suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);
-
-        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName));
-        buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
-    }
-
-    @Override
-    public void process(final K key, final Change<V> value) {
-        observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
-        buffer(key, value);
-        enforceConstraints();
-    }
-
-    private void buffer(final K key, final Change<V> value) {
-        final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
-        buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
-    }
-
-    private void enforceConstraints() {
-        final long streamTime = observedStreamTime;
-        final long expiryTime = streamTime - suppressDurationMillis;
-
-        buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
-
-        if (overCapacity()) {
-            switch (bufferFullStrategy) {
-                case EMIT:
-                    buffer.evictWhile(this::overCapacity, this::emit);
-                    return;
-                case SHUT_DOWN:
-                    throw new StreamsException(String.format(
-                        "%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.",
-                        internalProcessorContext.currentNode().name(),
-                        buffer.numRecords(), maxRecords,
-                        buffer.bufferSize(), maxBytes
-                    ));
-                default:
-                    throw new UnsupportedOperationException(
-                        "The bufferFullStrategy [" + bufferFullStrategy +
-                            "] is not implemented. This is a bug in Kafka Streams."
-                    );
-            }
-        }
-    }
-
-    private boolean overCapacity() {
-        return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
-    }
-
-    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) {
-        if (shouldForward(toEmit.value())) {
-            final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
-            internalProcessorContext.setRecordContext(toEmit.recordContext());
-            try {
-                internalProcessorContext.forward(toEmit.key(), toEmit.value());
-                suppressionEmitSensor.record();
-            } finally {
-                internalProcessorContext.setRecordContext(prevRecordContext);
-            }
-        }
-    }
-
-    private boolean shouldForward(final Change<V> value) {
-        return value.newValue != null || !safeToDropTombstones;
-    }
-
-    @Override
-    public void close() {
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
new file mode 100644
index 0000000..58e3831
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
+import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.Maybe;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, V> {
+    private final SuppressedInternal<K> suppress;
+    private final String storeName;
+    private final KTableImpl<K, ?, V> parentKTable;
+
+    public KTableSuppressProcessorSupplier(final SuppressedInternal<K> suppress,
+                                           final String storeName,
+                                           final KTableImpl<K, ?, V> parentKTable) {
+        this.suppress = suppress;
+        this.storeName = storeName;
+        this.parentKTable = parentKTable;
+        // The suppress buffer requires seeing the old values, to support the prior value view.
+        parentKTable.enableSendingOldValues();
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableSuppressProcessor<>(suppress, storeName);
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parentKTable.valueGetterSupplier();
+        return new KTableValueGetterSupplier<K, V>() {
+
+            @Override
+            public KTableValueGetter<K, V> get() {
+                final KTableValueGetter<K, V> parentGetter = parentValueGetterSupplier.get();
+                return new KTableValueGetter<K, V>() {
+                    private TimeOrderedKeyValueBuffer<K, V> buffer;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        parentGetter.init(context);
+                        // the main processor is responsible for the buffer's lifecycle
+                        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, V>) context.getStateStore(storeName));
+                    }
+
+                    @Override
+                    public ValueAndTimestamp<V> get(final K key) {
+                        final Maybe<ValueAndTimestamp<V>> maybeValue = buffer.priorValueForBuffered(key);
+                        if (maybeValue.isDefined()) {
+                            return maybeValue.getNullableValue();
+                        } else {
+                            // not buffered, so the suppressed view is equal to the parent view
+                            return parentGetter.get(key);
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        parentGetter.close();
+                        // the main processor is responsible for the buffer's lifecycle
+                    }
+                };
+            }
+
+            @Override
+            public String[] storeNames() {
+                final String[] parentStores = parentValueGetterSupplier.storeNames();
+                final String[] stores = new String[1 + parentStores.length];
+                System.arraycopy(parentStores, 0, stores, 1, parentStores.length);
+                stores[0] = storeName;
+                return stores;
+            }
+        };
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        parentKTable.enableSendingOldValues();
+    }
+
+    private static final class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
+        private final long maxRecords;
+        private final long maxBytes;
+        private final long suppressDurationMillis;
+        private final TimeDefinition<K> bufferTimeDefinition;
+        private final BufferFullStrategy bufferFullStrategy;
+        private final boolean safeToDropTombstones;
+        private final String storeName;
+
+        private TimeOrderedKeyValueBuffer<K, V> buffer;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor suppressionEmitSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        private KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
+            this.storeName = storeName;
+            requireNonNull(suppress);
+            maxRecords = suppress.bufferConfig().maxRecords();
+            maxBytes = suppress.bufferConfig().maxBytes();
+            suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
+            bufferTimeDefinition = suppress.timeDefinition();
+            bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
+            safeToDropTombstones = suppress.safeToDropTombstones();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
+            suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);
+
+            buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, V>) context.getStateStore(storeName));
+            buffer.setSerdesIfNull((Serde<K>) context.keySerde(), (Serde<V>) context.valueSerde());
+        }
+
+        @Override
+        public void process(final K key, final Change<V> value) {
+            observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
+            buffer(key, value);
+            enforceConstraints();
+        }
+
+        private void buffer(final K key, final Change<V> value) {
+            final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
+
+            buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
+        }
+
+        private void enforceConstraints() {
+            final long streamTime = observedStreamTime;
+            final long expiryTime = streamTime - suppressDurationMillis;
+
+            buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
+
+            if (overCapacity()) {
+                switch (bufferFullStrategy) {
+                    case EMIT:
+                        buffer.evictWhile(this::overCapacity, this::emit);
+                        return;
+                    case SHUT_DOWN:
+                        throw new StreamsException(String.format(
+                            "%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.",
+                            internalProcessorContext.currentNode().name(),
+                            buffer.numRecords(), maxRecords,
+                            buffer.bufferSize(), maxBytes
+                        ));
+                    default:
+                        throw new UnsupportedOperationException(
+                            "The bufferFullStrategy [" + bufferFullStrategy +
+                                "] is not implemented. This is a bug in Kafka Streams."
+                        );
+                }
+            }
+        }
+
+        private boolean overCapacity() {
+            return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
+        }
+
+        private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, V> toEmit) {
+            if (shouldForward(toEmit.value())) {
+                final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
+                internalProcessorContext.setRecordContext(toEmit.recordContext());
+                try {
+                    internalProcessorContext.forward(toEmit.key(), toEmit.value());
+                    suppressionEmitSensor.record();
+                } finally {
+                    internalProcessorContext.setRecordContext(prevRecordContext);
+                }
+            }
+        }
+
+        private boolean shouldForward(final Change<V> value) {
+            return value.newValue != null || !safeToDropTombstones;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java
new file mode 100644
index 0000000..9a13aa0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Objects;
+
+public final class BufferKey implements Comparable<BufferKey> {
+    private final long time;
+    private final Bytes key;
+
+    BufferKey(final long time, final Bytes key) {
+        this.time = time;
+        this.key = key;
+    }
+
+    long time() {
+        return time;
+    }
+
+    Bytes key() {
+        return key;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final BufferKey bufferKey = (BufferKey) o;
+        return time == bufferKey.time &&
+            Objects.equals(key, bufferKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(time, key);
+    }
+
+    @Override
+    public int compareTo(final BufferKey o) {
+        // ordering of keys within a time uses hashCode.
+        final int timeComparison = Long.compare(time, o.time);
+        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+    }
+
+    @Override
+    public String toString() {
+        return "BufferKey{" +
+            "key=" + key +
+            ", time=" + time +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
new file mode 100644
index 0000000..816894e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public final class BufferValue {
+    private final ContextualRecord record;
+    private final byte[] priorValue;
+
+    BufferValue(final ContextualRecord record,
+                final byte[] priorValue) {
+        this.record = record;
+        this.priorValue = priorValue;
+    }
+
+    ContextualRecord record() {
+        return record;
+    }
+
+    byte[] priorValue() {
+        return priorValue;
+    }
+
+    static BufferValue deserialize(final ByteBuffer buffer) {
+        final ContextualRecord record = ContextualRecord.deserialize(buffer);
+
+        final int priorValueLength = buffer.getInt();
+        if (priorValueLength == -1) {
+            return new BufferValue(record, null);
+        } else {
+            final byte[] priorValue = new byte[priorValueLength];
+            buffer.get(priorValue);
+            return new BufferValue(record, priorValue);
+        }
+    }
+
+    ByteBuffer serialize(final int endPadding) {
+
+        final int sizeOfPriorValueLength = Integer.BYTES;
+        final int sizeOfPriorValue = priorValue == null ? 0 : priorValue.length;
+
+        final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + sizeOfPriorValue + endPadding);
+
+        if (priorValue == null) {
+            buffer.putInt(-1);
+        } else {
+            buffer.putInt(priorValue.length);
+            buffer.put(priorValue);
+        }
+
+        return buffer;
+    }
+
+    long sizeBytes() {
+        return (priorValue == null ? 0 : priorValue.length) + record.sizeBytes();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final BufferValue that = (BufferValue) o;
+        return Objects.equals(record, that.record) &&
+            Arrays.equals(priorValue, that.priorValue);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(record);
+        result = 31 * result + Arrays.hashCode(priorValue);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "BufferValue{" +
+            "record=" + record +
+            ", priorValue=" + Arrays.toString(priorValue) +
+            '}';
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3893b35..3cd2c37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -43,13 +43,13 @@ public class ContextualRecord {
         return (value == null ? 0 : value.length) + recordContext.sizeBytes();
     }
 
-    byte[] serialize() {
+    ByteBuffer serialize(final int endPadding) {
         final byte[] serializedContext = recordContext.serialize();
 
         final int sizeOfContext = serializedContext.length;
         final int sizeOfValueLength = Integer.BYTES;
         final int sizeOfValue = value == null ? 0 : value.length;
-        final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue);
+        final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);
 
         buffer.put(serializedContext);
         if (value == null) {
@@ -59,7 +59,7 @@ public class ContextualRecord {
             buffer.put(value);
         }
 
-        return buffer.array();
+        return buffer;
     }
 
     static ContextualRecord deserialize(final ByteBuffer buffer) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index e11df7c..6c5022f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -25,6 +25,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -32,7 +34,9 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.metrics.Sensors;
 
 import java.nio.ByteBuffer;
@@ -42,7 +46,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Consumer;
@@ -55,16 +59,18 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
         new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
+    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
     private final Map<Bytes, BufferKey> index = new HashMap<>();
-    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap<>();
+    private final TreeMap<BufferKey, BufferValue> sortedMap = new TreeMap<>();
 
     private final Set<Bytes> dirtyKeys = new HashSet<>();
     private final String storeName;
     private final boolean loggingEnabled;
 
     private Serde<K> keySerde;
-    private Serde<V> valueSerde;
+    private FullChangeSerde<V> valueSerde;
 
     private long memBufferSize = 0L;
     private long minTimestamp = Long.MAX_VALUE;
@@ -77,7 +83,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
 
     private int partition;
 
-    public static class Builder<K, V> implements StoreBuilder<StateStore> {
+    public static class Builder<K, V> implements StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> {
 
         private final String storeName;
         private final Serde<K> keySerde;
@@ -94,11 +100,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
-        public StoreBuilder<StateStore> withCachingEnabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingEnabled() {
             return this;
         }
 
@@ -106,21 +112,21 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
-        public StoreBuilder<StateStore> withCachingDisabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingDisabled() {
             return this;
         }
 
         @Override
-        public StoreBuilder<StateStore> withLoggingEnabled(final Map<String, String> config) {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingEnabled(final Map<String, String> config) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public StoreBuilder<StateStore> withLoggingDisabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingDisabled() {
             loggingEnabled = false;
             return this;
         }
@@ -146,49 +152,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         }
     }
 
-    private static final class BufferKey implements Comparable<BufferKey> {
-        private final long time;
-        private final Bytes key;
-
-        private BufferKey(final long time, final Bytes key) {
-            this.time = time;
-            this.key = key;
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            final BufferKey bufferKey = (BufferKey) o;
-            return time == bufferKey.time &&
-                Objects.equals(key, bufferKey.key);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(time, key);
-        }
-
-        @Override
-        public int compareTo(final BufferKey o) {
-            // ordering of keys within a time uses hashCode.
-            final int timeComparison = Long.compare(time, o.time);
-            return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
-        }
-
-        @Override
-        public String toString() {
-            return "BufferKey{" +
-                "key=" + key +
-                ", time=" + time +
-                '}';
-        }
-    }
-
     private InMemoryTimeOrderedKeyValueBuffer(final String storeName,
                                               final boolean loggingEnabled,
                                               final Serde<K> keySerde,
@@ -196,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         this.storeName = storeName;
         this.loggingEnabled = loggingEnabled;
         this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
+        this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
     }
 
     @Override
@@ -213,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     @Override
     public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
         this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
-        this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde;
+        this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
     }
 
     @Override
@@ -261,7 +224,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     // The record was evicted from the buffer. Send a tombstone.
                     logTombstone(key);
                 } else {
-                    final ContextualRecord value = sortedMap.get(bufferKey);
+                    final BufferValue value = sortedMap.get(bufferKey);
 
                     logValue(key, bufferKey, value);
                 }
@@ -270,22 +233,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         }
     }
 
-    private void logValue(final Bytes key, final BufferKey bufferKey, final ContextualRecord value) {
-        final byte[] serializedContextualRecord = value.serialize();
+    private void logValue(final Bytes key, final BufferKey bufferKey, final BufferValue value) {
 
         final int sizeOfBufferTime = Long.BYTES;
-        final int sizeOfContextualRecord = serializedContextualRecord.length;
-
-        final byte[] timeAndContextualRecord = ByteBuffer.wrap(new byte[sizeOfBufferTime + sizeOfContextualRecord])
-                                                         .putLong(bufferKey.time)
-                                                         .put(serializedContextualRecord)
-                                                         .array();
+        final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
+        buffer.putLong(bufferKey.time());
 
         collector.send(
             changelogTopic,
             key,
-            timeAndContextualRecord,
-            V_1_CHANGELOG_HEADERS,
+            buffer.array(),
+            V_2_CHANGELOG_HEADERS,
             partition,
             null,
             KEY_SERIALIZER,
@@ -312,12 +270,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                 // This was a tombstone. Delete the record.
                 final BufferKey bufferKey = index.remove(key);
                 if (bufferKey != null) {
-                    final ContextualRecord removed = sortedMap.remove(bufferKey);
+                    final BufferValue removed = sortedMap.remove(bufferKey);
                     if (removed != null) {
-                        memBufferSize -= computeRecordSize(bufferKey.key, removed);
+                        memBufferSize -= computeRecordSize(bufferKey.key(), removed);
                     }
-                    if (bufferKey.time == minTimestamp) {
-                        minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time;
+                    if (bufferKey.time() == minTimestamp) {
+                        minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
                     }
                 }
 
@@ -331,33 +289,46 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
                     );
                 }
             } else {
-                final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
-                final long time = timeAndValue.getLong();
-                final byte[] value = new byte[record.value().length - 8];
-                timeAndValue.get(value);
                 if (record.headers().lastHeader("v") == null) {
+                    // in this case, the changelog value is just the serialized record value
+                    final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+                    final long time = timeAndValue.getLong();
+                    final byte[] changelogValue = new byte[record.value().length - 8];
+                    timeAndValue.get(changelogValue);
+
                     cleanPut(
                         time,
                         key,
-                        new ContextualRecord(
-                            value,
-                            new ProcessorRecordContext(
-                                record.timestamp(),
-                                record.offset(),
-                                record.partition(),
-                                record.topic(),
-                                record.headers()
-                            )
+                        new BufferValue(
+                            new ContextualRecord(
+                                changelogValue,
+                                new ProcessorRecordContext(
+                                    record.timestamp(),
+                                    record.offset(),
+                                    record.partition(),
+                                    record.topic(),
+                                    record.headers()
+                                )
+                            ),
+                            inferPriorValue(key, changelogValue)
                         )
                     );
                 } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(value));
-
-                    cleanPut(
-                        time,
-                        key,
-                        contextualRecord
-                    );
+                    // in this case, the changelog value is a serialized ContextualRecord
+                    final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
+                    final long time = timeAndValue.getLong();
+                    final byte[] changelogValue = new byte[record.value().length - 8];
+                    timeAndValue.get(changelogValue);
+
+                    final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
+                    cleanPut(time, key, new BufferValue(contextualRecord, inferPriorValue(key, contextualRecord.value())));
+                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
+                    // in this case, the changelog value is a serialized BufferValue
+
+                    final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
+                    final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
+                    final long time = valueAndTime.getLong();
+                    cleanPut(time, key, bufferValue);
                 } else {
                     throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
                 }
@@ -375,42 +346,50 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         updateBufferMetrics();
     }
 
+    private byte[] inferPriorValue(final Bytes key, final byte[] serializedChange) {
+        return index.containsKey(key)
+            ? internalPriorValueForBuffered(key)
+            : FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
+    }
+
 
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, V>> callback) {
-        final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
+        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = sortedMap.entrySet().iterator();
         int evictions = 0;
 
         if (predicate.get()) {
-            Map.Entry<BufferKey, ContextualRecord> next = null;
+            Map.Entry<BufferKey, BufferValue> next = null;
             if (delegate.hasNext()) {
                 next = delegate.next();
             }
 
             // predicate being true means we read one record, call the callback, and then remove it
             while (next != null && predicate.get()) {
-                if (next.getKey().time != minTimestamp) {
+                if (next.getKey().time() != minTimestamp) {
                     throw new IllegalStateException(
                         "minTimestamp [" + minTimestamp + "] did not match the actual min timestamp [" +
-                            next.getKey().time + "]"
+                            next.getKey().time() + "]"
                     );
                 }
-                final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get());
-                final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value());
-                callback.accept(new Eviction<>(key, value, next.getValue().recordContext()));
+                final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
+                final BufferValue bufferValue = next.getValue();
+                final ContextualRecord record = bufferValue.record();
+                final Change<V> value = valueSerde.deserializer().deserialize(changelogTopic, record.value());
+                callback.accept(new Eviction<>(key, value, record.recordContext()));
 
                 delegate.remove();
-                index.remove(next.getKey().key);
+                index.remove(next.getKey().key());
 
-                dirtyKeys.add(next.getKey().key);
+                dirtyKeys.add(next.getKey().key());
 
-                memBufferSize -= computeRecordSize(next.getKey().key, next.getValue());
+                memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue);
 
                 // peek at the next record so we can update the minTimestamp
                 if (delegate.hasNext()) {
                     next = delegate.next();
-                    minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time;
+                    minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time();
                 } else {
                     next = null;
                     minTimestamp = Long.MAX_VALUE;
@@ -425,9 +404,39 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
     }
 
     @Override
+    public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(final K key) {
+        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+        if (index.containsKey(serializedKey)) {
+            final byte[] serializedValue = internalPriorValueForBuffered(serializedKey);
+
+            final V deserializedValue = valueSerde.innerSerde().deserializer().deserialize(
+                changelogTopic,
+                serializedValue
+            );
+
+            // it's unfortunately not possible to know this, unless we materialize the suppressed result, since our only
+            // knowledge of the prior value is what the upstream processor sends us as the "old value" when we first
+            // buffer something.
+            return Maybe.defined(ValueAndTimestamp.make(deserializedValue, RecordQueue.UNKNOWN));
+        } else {
+            return Maybe.undefined();
+        }
+    }
+
+    private byte[] internalPriorValueForBuffered(final Bytes key) {
+        final BufferKey bufferKey = index.get(key);
+        if (bufferKey == null) {
+            throw new NoSuchElementException("Key [" + key + "] is not in the buffer.");
+        } else {
+            final BufferValue bufferValue = sortedMap.get(bufferKey);
+            return bufferValue.priorValue();
+        }
+    }
+
+    @Override
     public void put(final long time,
                     final K key,
-                    final V value,
+                    final Change<V> value,
                     final ProcessorRecordContext recordContext) {
         requireNonNull(value, "value cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
@@ -435,12 +444,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
         final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
 
-        cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext));
+        final BufferValue buffered = getBuffered(serializedKey);
+        final byte[] serializedPriorValue;
+        if (buffered == null) {
+            final V priorValue = value.oldValue;
+            serializedPriorValue = valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+        } else {
+            serializedPriorValue = buffered.priorValue();
+        }
+
+        cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue));
         dirtyKeys.add(serializedKey);
         updateBufferMetrics();
     }
 
-    private void cleanPut(final long time, final Bytes key, final ContextualRecord value) {
+    private BufferValue getBuffered(final Bytes key) {
+        final BufferKey bufferKey = index.get(key);
+        return bufferKey == null ? null : sortedMap.get(bufferKey);
+    }
+
+    private void cleanPut(final long time, final Bytes key, final BufferValue value) {
         // non-resetting semantics:
         // if there was a previous version of the same record,
         // then insert the new record in the same place in the priority queue
@@ -453,7 +476,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
             minTimestamp = Math.min(minTimestamp, time);
             memBufferSize += computeRecordSize(key, value);
         } else {
-            final ContextualRecord removedValue = sortedMap.put(previousKey, value);
+            final BufferValue removedValue = sortedMap.put(previousKey, value);
             memBufferSize =
                 memBufferSize
                     + computeRecordSize(key, value)
@@ -476,7 +499,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
         return minTimestamp;
     }
 
-    private static long computeRecordSize(final Bytes key, final ContextualRecord value) {
+    private static long computeRecordSize(final Bytes key, final BufferValue value) {
         long size = 0L;
         size += 8; // buffer time
         size += key.get().length;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java
new file mode 100644
index 0000000..c292c17
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A container that may be empty, may contain null, or may contain a value.
+ * Distinct from {@link java.util.Optional<T>}, since Optional cannot contain null.
+ *
+ * @param <T>
+ */
+public final class Maybe<T> {
+    private final T nullableValue;
+    private final boolean defined;
+
+    public static <T> Maybe<T> defined(final T nullableValue) {
+        return new Maybe<>(nullableValue);
+    }
+
+    public static <T> Maybe<T> undefined() {
+        return new Maybe<>();
+    }
+
+    private Maybe(final T nullableValue) {
+        this.nullableValue = nullableValue;
+        defined = true;
+    }
+
+    private Maybe() {
+        nullableValue = null;
+        defined = false;
+    }
+
+    public T getNullableValue() {
+        if (defined) {
+            return nullableValue;
+        } else {
+            throw new NoSuchElementException();
+        }
+    }
+
+    public boolean isDefined() {
+        return defined;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final Maybe<?> maybe = (Maybe<?>) o;
+
+        // All undefined maybes are equal
+        // All defined null maybes are equal
+        return defined == maybe.defined &&
+            (!defined || Objects.equals(nullableValue, maybe.nullableValue));
+    }
+
+    @Override
+    public int hashCode() {
+        // Since all undefined maybes are equal, we can hard-code their hashCode to -1.
+        // Since all defined null maybes are equal, we can hard-code their hashCode to 0.
+        return defined ? nullableValue == null ? 0 : nullableValue.hashCode() : -1;
+    }
+
+    @Override
+    public String toString() {
+        if (defined) {
+            return "DefinedMaybe{" + String.valueOf(nullableValue) + "}";
+        } else {
+            return "UndefinedMaybe{}";
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index ffa1f49..3aabbaa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -17,20 +17,23 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
+
     final class Eviction<K, V> {
         private final K key;
-        private final V value;
+        private final Change<V> value;
         private final ProcessorRecordContext recordContext;
 
-        Eviction(final K key, final V value, final ProcessorRecordContext recordContext) {
+        Eviction(final K key, final Change<V> value, final ProcessorRecordContext recordContext) {
             this.key = key;
             this.value = value;
             this.recordContext = recordContext;
@@ -40,7 +43,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
             return key;
         }
 
-        public V value() {
+        public Change<V> value() {
             return value;
         }
 
@@ -73,7 +76,9 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
 
     void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
 
-    void put(long time, K key, V value, ProcessorRecordContext recordContext);
+    Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
+
+    void put(long time, K key, Change<V> value, ProcessorRecordContext recordContext);
 
     int numRecords();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 43ad42a..3c7fd1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -517,6 +517,204 @@ public class SuppressScenarioTest {
         }
     }
 
+    @Test
+    public void shouldWorkBeforeGroupBy() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder
+            .table("topic", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()))
+            .groupBy(KeyValue::pair, Grouped.with(Serdes.String(), Serdes.String()))
+            .count()
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("topic", "A", "a", 0L));
+            driver.pipeInput(recordFactory.create("topic", "tick", "tick", 10L));
+
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", 1L, 0L))
+            );
+        }
+    }
+
+    @Test
+    public void shouldWorkBeforeJoinRight() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder
+            .table("left", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KTable<String, String> right = builder
+            .table("right", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()));
+
+        left
+            .outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r))
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("right", "B", "1", 0L));
+            driver.pipeInput(recordFactory.create("right", "A", "1", 0L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "tick", "tick", 10L));
+            // flush buffer
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("A", "(null,1)", 0L),
+                    new KeyValueTimestamp<>("B", "(null,1)", 0L)
+                )
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "2", 11L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "a", 12L));
+            // should join with previously emitted right side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(a,1)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "B", "b", 12L));
+            // should view through to the parent KTable, since B is no longer buffered
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("B", "(b,1)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "b", 13L));
+            // should join with previously emitted right side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(b,1)", 13L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "tick", "tick", 21L));
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", "(null,tick)", 21), // just a testing artifact
+                    new KeyValueTimestamp<>("A", "(b,2)", 13L)
+                )
+            );
+        }
+
+    }
+
+
+    @Test
+    public void shouldWorkBeforeJoinLeft() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder
+            .table("left", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()));
+
+        final KTable<String, String> right = builder
+            .table("right", Consumed.with(Serdes.String(), Serdes.String()));
+
+        left
+            .outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r))
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("left", "B", "1", 0L));
+            driver.pipeInput(recordFactory.create("left", "A", "1", 0L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "tick", "tick", 10L));
+            // flush buffer
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("A", "(1,null)", 0L),
+                    new KeyValueTimestamp<>("B", "(1,null)", 0L)
+                )
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "2", 11L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "a", 12L));
+            // should join with previously emitted left side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(1,a)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "B", "b", 12L));
+            // should view through to the parent KTable, since B is no longer buffered
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("B", "(1,b)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "b", 13L));
+            // should join with previously emitted left side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(1,b)", 13L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "tick", "tick", 21L));
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", "(tick,null)", 21), // just a testing artifact
+                    new KeyValueTimestamp<>("A", "(2,b)", 13L)
+                )
+            );
+        }
+
+    }
+
+
     private static <K, V> void verify(final List<ProducerRecord<K, V>> results,
                                       final List<KeyValueTimestamp<K, V>> expectedResults) {
         if (results.size() != expectedResults.size()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 62ae3bf..96ee735 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
+import org.easymock.EasyMock;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
@@ -141,11 +144,13 @@ public class KTableSuppressProcessorMetricsTest {
             .withLoggingDisabled()
             .build();
 
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(
+        final KTableImpl<String, ?, Long> mock = EasyMock.mock(KTableImpl.class);
+        final Processor<String, Change<Long>> processor =
+            new KTableSuppressProcessorSupplier<>(
                 (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
-                storeName
-            );
+                storeName,
+                mock
+            ).get();
 
         final MockInternalProcessorContext context = new MockInternalProcessorContext();
         context.setCurrentNode(new ProcessorNode("testNode"));
@@ -164,9 +169,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(25.5));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(51.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(51.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
@@ -180,9 +185,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(49.0));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(47.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(98.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 6c10d91..d8cb858 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -26,13 +26,16 @@ import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
+import org.easymock.EasyMock;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -62,7 +65,7 @@ public class KTableSuppressProcessorTest {
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
     private static class Harness<K, V> {
-        private final KTableSuppressProcessor<K, V> processor;
+        private final Processor<K, Change<V>> processor;
         private final MockInternalProcessorContext context;
 
 
@@ -76,8 +79,9 @@ public class KTableSuppressProcessorTest {
                 .withLoggingDisabled()
                 .build();
 
-            final KTableSuppressProcessor<K, V> processor =
-                new KTableSuppressProcessor<>((SuppressedInternal<K>) suppressed, storeName);
+            final KTableImpl<K, ?, V> parent = EasyMock.mock(KTableImpl.class);
+            final Processor<K, Change<V>> processor =
+                new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get();
 
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
             context.setCurrentNode(new ProcessorNode("testNode"));
@@ -95,13 +99,12 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -114,13 +117,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -133,17 +135,16 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("topic", 0, 1, null, 1L);
-        processor.process("tick", new Change<>(null, null));
+        harness.processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -156,7 +157,6 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long windowStart = 99L;
         final long recordTime = 99L;
@@ -164,7 +164,7 @@ public class KTableSuppressProcessorTest {
         context.setRecordMetadata("topic", 0, 0, null, recordTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         // although the stream time is now 100, we have to wait 1 ms after the window *end* before we
@@ -173,7 +173,7 @@ public class KTableSuppressProcessorTest {
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime2);
-        processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
         assertThat(context.forwarded(), hasSize(0));
 
         // ok, now it's time to emit "hey"
@@ -181,7 +181,7 @@ public class KTableSuppressProcessorTest {
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime3);
-        processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -199,7 +199,6 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         // note the record is in the past, but the window end is in the future, so we still have to buffer,
         // even though the grace period is 0.
@@ -208,11 +207,11 @@ public class KTableSuppressProcessorTest {
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("", 0, 1L, null, windowEnd);
-        processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -225,13 +224,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -248,13 +246,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -269,13 +266,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -289,13 +285,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -313,13 +308,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -337,13 +331,12 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -356,16 +349,15 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
-        processor.process("dummyKey", value);
+        harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -378,16 +370,15 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
-        processor.process("dummyKey", value);
+        harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -400,18 +391,17 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp);
         try {
-            processor.process("dummyKey", value);
+            harness.processor.process("dummyKey", value);
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
@@ -423,18 +413,17 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp);
         try {
-            processor.process("dummyKey", value);
+            harness.processor.process("dummyKey", value);
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
new file mode 100644
index 0000000..3aef6d0
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
+import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
+import org.apache.kafka.streams.kstream.SuppressedTest;
+import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
+import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest;
+import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * This suite runs all the tests related to the Suppression feature.
+ *
+ * It can be used from an IDE to selectively just run these tests when developing code related to Suppress.
+ * 
+ * If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all
+ * these tests are already included in the `:streams:test` task.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    KTableSuppressProcessorMetricsTest.class,
+    KTableSuppressProcessorTest.class,
+    SuppressScenarioTest.class,
+    SuppressTopologyTest.class,
+    SuppressedTest.class,
+    SuppressionIntegrationTest.class,
+    SuppressionDurabilityIntegrationTest.class,
+    InMemoryTimeOrderedKeyValueBufferTest.class,
+    TimeOrderedKeyValueBufferTest.class
+})
+public class SuppressSuite {
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java
new file mode 100644
index 0000000..7a29e13
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.Test;
+
+import java.util.NoSuchElementException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.fail;
+
+public class MaybeTest {
+    @Test
+    public void shouldReturnDefinedValue() {
+        assertThat(Maybe.defined(null).getNullableValue(), nullValue());
+        assertThat(Maybe.defined("ASDF").getNullableValue(), is("ASDF"));
+    }
+
+    @Test
+    public void shouldAnswerIsDefined() {
+        assertThat(Maybe.defined(null).isDefined(), is(true));
+        assertThat(Maybe.defined("ASDF").isDefined(), is(true));
+        assertThat(Maybe.undefined().isDefined(), is(false));
+    }
+
+    @Test
+    public void shouldThrowOnGetUndefinedValue() {
+        final Maybe<Object> undefined = Maybe.undefined();
+        try {
+            undefined.getNullableValue();
+            fail();
+        } catch (final NoSuchElementException e) {
+            // no assertion necessary
+        }
+    }
+
+    @Test
+    public void shouldUpholdEqualityCorrectness() {
+        assertThat(Maybe.undefined().equals(Maybe.undefined()), is(true));
+        assertThat(Maybe.defined(null).equals(Maybe.defined(null)), is(true));
+        assertThat(Maybe.defined("q").equals(Maybe.defined("q")), is(true));
+
+        assertThat(Maybe.undefined().equals(Maybe.defined(null)), is(false));
+        assertThat(Maybe.undefined().equals(Maybe.defined("x")), is(false));
+
+        assertThat(Maybe.defined(null).equals(Maybe.undefined()), is(false));
+        assertThat(Maybe.defined(null).equals(Maybe.defined("x")), is(false));
+
+        assertThat(Maybe.defined("a").equals(Maybe.undefined()), is(false));
+        assertThat(Maybe.defined("a").equals(Maybe.defined(null)), is(false));
+        assertThat(Maybe.defined("a").equals(Maybe.defined("b")), is(false));
+    }
+
+    @Test
+    public void shouldUpholdHashCodeCorrectness() {
+        // This specifies the current implementation, which is simpler to write than an exhaustive test.
+        // As long as this implementation doesn't change, then the equals/hashcode contract is upheld.
+
+        assertThat(Maybe.undefined().hashCode(), is(-1));
+        assertThat(Maybe.defined(null).hashCode(), is(0));
+        assertThat(Maybe.defined("a").hashCode(), is("a".hashCode()));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 6ae36d4..941832b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -23,12 +23,16 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
@@ -57,8 +61,8 @@ import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
+    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
     private static final String APP_ID = "test-app";
     private final Function<String, B> bufferSupplier;
@@ -129,10 +133,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         try {
-            buffer.put(0, "asdf",
-                       null,
-                       getContext(0)
-            );
+            buffer.put(0, "asdf", null, getContext(0));
             fail("expected an exception");
         } catch (final NullPointerException expected) {
             // expected
@@ -163,7 +164,9 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
         assertThat(buffer.numRecords(), is(1));
-        assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L)))));
+        assertThat(evicted, is(singletonList(
+            new Eviction<>("asdf", new Change<>("eyt", null), getContext(0L))
+        )));
         cleanup(context, buffer);
     }
 
@@ -187,11 +190,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
-        assertThat(buffer.bufferSize(), is(43L));
+        assertThat(buffer.bufferSize(), is(51L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
-        assertThat(buffer.bufferSize(), is(39L));
+        assertThat(buffer.bufferSize(), is(47L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
-        assertThat(buffer.bufferSize(), is(82L));
+        assertThat(buffer.bufferSize(), is(98L));
         cleanup(context, buffer);
     }
 
@@ -215,12 +218,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
-        assertThat(buffer.bufferSize(), is(42L));
+        assertThat(buffer.bufferSize(), is(50L));
         assertThat(buffer.minTimestamp(), is(1L));
 
         putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
         assertThat(buffer.numRecords(), is(2));
-        assertThat(buffer.bufferSize(), is(82L));
+        assertThat(buffer.bufferSize(), is(98L));
         assertThat(buffer.minTimestamp(), is(0L));
 
         final AtomicInteger callbackCount = new AtomicInteger(0);
@@ -229,14 +232,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                 case 1: {
                     assertThat(kv.key(), is("asdf"));
                     assertThat(buffer.numRecords(), is(2));
-                    assertThat(buffer.bufferSize(), is(82L));
+                    assertThat(buffer.bufferSize(), is(98L));
                     assertThat(buffer.minTimestamp(), is(0L));
                     break;
                 }
                 case 2: {
                     assertThat(kv.key(), is("zxcv"));
                     assertThat(buffer.numRecords(), is(1));
-                    assertThat(buffer.bufferSize(), is(42L));
+                    assertThat(buffer.bufferSize(), is(50L));
                     assertThat(buffer.minTimestamp(), is(1L));
                     break;
                 }
@@ -254,6 +257,29 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     @Test
+    public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        assertThat(buffer.priorValueForBuffered("ASDF"), is(Maybe.undefined()));
+    }
+
+    @Test
+    public void shouldReturnPriorValueForBufferedKey() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final ProcessorRecordContext recordContext = getContext(0L);
+        context.setRecordContext(recordContext);
+        buffer.put(1L, "A", new Change<>("new-value", "old-value"), recordContext);
+        buffer.put(1L, "B", new Change<>("new-value", null), recordContext);
+        assertThat(buffer.priorValueForBuffered("A"), is(Maybe.defined(ValueAndTimestamp.make("old-value", -1))));
+        assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
+    }
+
+    @Test
     public void shouldFlush() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
@@ -272,19 +298,19 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         // which we can't compare for equality using ProducerRecord.
         // As a workaround, I'm deserializing them and shoving them in a KeyValue, just for ease of testing.
 
-        final List<ProducerRecord<String, KeyValue<Long, ContextualRecord>>> collected =
+        final List<ProducerRecord<String, KeyValue<Long, BufferValue>>> collected =
             ((MockRecordCollector) context.recordCollector())
                 .collected()
                 .stream()
                 .map(pr -> {
-                    final KeyValue<Long, ContextualRecord> niceValue;
+                    final KeyValue<Long, BufferValue> niceValue;
                     if (pr.value() == null) {
                         niceValue = null;
                     } else {
-                        final byte[] timestampAndValue = pr.value();
-                        final ByteBuffer wrap = ByteBuffer.wrap(timestampAndValue);
-                        final long timestamp = wrap.getLong();
-                        final ContextualRecord contextualRecord = ContextualRecord.deserialize(wrap);
+                        final byte[] serializedValue = pr.value();
+                        final ByteBuffer valueBuffer = ByteBuffer.wrap(serializedValue);
+                        final BufferValue contextualRecord = BufferValue.deserialize(valueBuffer);
+                        final long timestamp = valueBuffer.getLong();
                         niceValue = new KeyValue<>(timestamp, contextualRecord);
                     }
 
@@ -309,15 +335,15 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  0,
                                  null,
                                  "zxcv",
-                                 new KeyValue<>(1L, getRecord("3gon4i", 1)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(1L, getBufferValue("3gon4i", 1)),
+                                 V_2_CHANGELOG_HEADERS
             ),
             new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
                                  0,
                                  null,
                                  "asdf",
-                                 new KeyValue<>(2L, getRecord("2093j", 0)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(2L, getBufferValue("2093j", 0)),
+                                 V_2_CHANGELOG_HEADERS
             )
         )));
 
@@ -335,6 +361,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
+        final Serializer<Change<String>> serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer();
+
+        final byte[] todeleteValue = serializer.serialize(null, new Change<>("doomed", null));
+        final byte[] asdfValue = serializer.serialize(null, new Change<>("qwer", null));
+        final byte[] zxcvValue1 = serializer.serialize(null, new Change<>("eo4im", "previous"));
+        final byte[] zxcvValue2 = serializer.serialize(null, new Change<>("next", "eo4im"));
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -345,7 +377,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 6).putLong(0L).put("doomed".getBytes(UTF_8)).array()),
+                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
@@ -355,7 +387,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 4).putLong(2L).put("qwer".getBytes(UTF_8)).array()),
+                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
@@ -365,12 +397,22 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 5).putLong(1L).put("3o4im".getBytes(UTF_8)).array())
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array())
         ));
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(160L));
+        assertThat(buffer.bufferSize(), is(196L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -387,7 +429,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(103L));
+        assertThat(buffer.bufferSize(), is(131L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
@@ -405,11 +451,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         assertThat(evicted, is(asList(
             new Eviction<>(
                 "zxcv",
-                "3o4im",
-                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())),
+                new Change<>("next", "eo4im"),
+                new ProcessorRecordContext(3L, 3, 0, "changelog-topic", new RecordHeaders())),
             new Eviction<>(
                 "asdf",
-                "qwer",
+                new Change<>("qwer", null),
                 new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders()))
         )));
 
@@ -417,7 +463,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     @Test
-    public void shouldRestoreNewFormat() {
+    public void shouldRestoreV1Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
@@ -429,9 +475,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
-        final byte[] todeleteValue = getRecord("doomed", 0).serialize();
-        final byte[] asdfValue = getRecord("qwer", 1).serialize();
-        final byte[] zxcvValue = getRecord("3o4im", 2).serialize();
+        final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
+        final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue1 = new ContextualRecord(
+            fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "previous")),
+            getContext(2L)
+        ).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue2 = new ContextualRecord(
+            fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
+            getContext(3L)
+        ).serialize(0).array();
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -464,13 +519,24 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue.length).putLong(1L).put(zxcvValue).array(),
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(),
+                                 v1FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(),
                                  v1FlagHeaders)
         ));
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(130L));
+        assertThat(buffer.bufferSize(), is(166L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -487,7 +553,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(83L));
+        assertThat(buffer.bufferSize(), is(111L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
@@ -505,11 +575,143 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         assertThat(evicted, is(asList(
             new Eviction<>(
                 "zxcv",
-                "3o4im",
-                getContext(2L)),
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
             new Eviction<>(
                 "asdf",
-                "qwer",
+                new Change<>("qwer", null),
+                getContext(1L)
+            ))));
+
+        cleanup(context, buffer);
+    }
+
+    @Test
+    public void shouldRestoreV2Format() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final RecordBatchingStateRestoreCallback stateRestoreCallback =
+            (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
+
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+
+        final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+
+        final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
+        final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue1 =
+            new BufferValue(
+                new ContextualRecord(
+                    fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "IGNORED")),
+                    getContext(2L)
+                ),
+                Serdes.String().serializer().serialize(null, "previous")
+            ).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue2 =
+            new BufferValue(
+                new ContextualRecord(
+                    fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
+                    getContext(3L)
+                ),
+                Serdes.String().serializer().serialize(null, "previous")
+            ).serialize(0).array();
+        stateRestoreCallback.restoreBatch(asList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 0,
+                                 999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).put(todeleteValue).putLong(0L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 1,
+                                 9999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "asdf".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).put(asdfValue).putLong(2L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 99,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(),
+                                 v2FlagHeaders)
+        ));
+
+        assertThat(buffer.numRecords(), is(3));
+        assertThat(buffer.minTimestamp(), is(0L));
+        assertThat(buffer.bufferSize(), is(166L));
+
+        stateRestoreCallback.restoreBatch(singletonList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 null)
+        ));
+
+        assertThat(buffer.numRecords(), is(2));
+        assertThat(buffer.minTimestamp(), is(1L));
+        assertThat(buffer.bufferSize(), is(111L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+        // flush the buffer into a list in buffer order so we can make assertions about the contents.
+
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
+        buffer.evictWhile(() -> true, evicted::add);
+
+        // Several things to note:
+        // * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
+        // * The record timestamps are properly restored, and not conflated with the record's buffer time.
+        // * The keys and values are properly restored
+        // * The record topic is set to the original input topic, *not* the changelog topic
+        // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
+
+
+        assertThat(evicted, is(asList(
+            new Eviction<>(
+                "zxcv",
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
+            new Eviction<>(
+                "asdf",
+                new Change<>("qwer", null),
                 getContext(1L)
             ))));
 
@@ -529,7 +731,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
 
-        final byte[] todeleteValue = getRecord("doomed", 0).serialize();
+        final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
         try {
             stateRestoreCallback.restoreBatch(singletonList(
                 new ConsumerRecord<>("changelog-topic",
@@ -560,12 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                   final String value) {
         final ProcessorRecordContext recordContext = getContext(recordTimestamp);
         context.setRecordContext(recordContext);
-        buffer.put(streamTime, key, value, recordContext);
+        buffer.put(streamTime, key, new Change<>(value, null), recordContext);
+    }
+
+    private static BufferValue getBufferValue(final String value, final long timestamp) {
+        final ContextualRecord contextualRecord = getContextualRecord(value, timestamp);
+        return new BufferValue(contextualRecord, null);
     }
 
-    private static ContextualRecord getRecord(final String value, final long timestamp) {
+    private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
+        final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
         return new ContextualRecord(
-            value.getBytes(UTF_8),
+            fullChangeSerde.serializer().serialize(null, new Change<>(value, null)),
             getContext(timestamp)
         );
     }


Mime
View raw message