kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602)
Date Fri, 26 Apr 2019 16:44:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 607cf8f  KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602)
607cf8f is described below

commit 607cf8f578684910a01722d694902834b9a9a43e
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Apr 26 11:30:20 2019 -0500

    KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602)
    
    Reviewers:  Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/internals/FullChangeSerde.java |   8 +-
 .../streams/kstream/internals/KStreamImpl.java     |  36 +++-
 .../streams/kstream/internals/KTableImpl.java      |   9 +-
 .../suppress/KTableSuppressProcessor.java          |  42 ++---
 .../processor/internals/ProcessorContextImpl.java  |   4 +-
 .../internals/ProcessorRecordContext.java          |  23 +--
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  55 ++++--
 .../state/internals/TimeOrderedKeyValueBuffer.java |  57 +++++-
 .../integration/SuppressionIntegrationTest.java    | 157 +++++++++++++++-
 .../kstream/internals/FullChangeSerdeTest.java     |  20 +-
 .../KTableSuppressProcessorMetricsTest.java        |  19 +-
 .../suppress/KTableSuppressProcessorTest.java      |  17 +-
 .../internals/AbstractProcessorContextTest.java    |   4 +-
 .../InMemoryTimeOrderedKeyValueBufferTest.java     |   4 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   | 208 +++++++++------------
 15 files changed, 418 insertions(+), 245 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 9bb8373..f06a428 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -25,21 +25,21 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class FullChangeSerde<T> implements Serde<Change<T>> {
+public final class FullChangeSerde<T> implements Serde<Change<T>> {
     private final Serde<T> inner;
 
     @SuppressWarnings("unchecked")
-    public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
+    public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
         if (serde == null) {
             return null;
         } else if (serde instanceof FullChangeSerde) {
             return (FullChangeSerde<T>) serde;
         } else {
-            return new FullChangeSerde<T>((Serde<T>) serde);
+            return new FullChangeSerde<>(serde);
         }
     }
 
-    public FullChangeSerde(final Serde<T> inner) {
+    private FullChangeSerde(final Serde<T> inner) {
         this.inner = requireNonNull(inner);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ba08b89..529270c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -391,18 +391,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public KStream<K, V> through(final String topic) {
-        return through(topic, Produced.with(null, null, null));
+        return through(topic, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(produced, "Produced can't be null");
         final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
         to(topic, producedInternal);
         return builder.stream(
             Collections.singleton(topic),
             new ConsumedInternal<>(
-                producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
-                producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
+                producedInternal.keySerde(),
+                producedInternal.valueSerde(),
                 new FailOnInvalidTimestamp(),
                 null
             )
@@ -411,26 +419,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
     @Override
     public void to(final String topic) {
-        to(topic, Produced.with(null, null, null));
+        to(topic, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced));
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
+        to(new StaticTopicNameExtractor<>(topic), producedInternal);
     }
 
     @Override
     public void to(final TopicNameExtractor<K, V> topicExtractor) {
-        to(topicExtractor, Produced.with(null, null, null));
+        to(topicExtractor, Produced.with(keySerde, valSerde, null));
     }
 
     @Override
     public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
         Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(topicExtractor, new ProducedInternal<>(produced));
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        if (producedInternal.keySerde() == null) {
+            producedInternal.withKeySerde(keySerde);
+        }
+        if (producedInternal.valueSerde() == null) {
+            producedInternal.withValueSerde(valSerde);
+        }
+        to(topicExtractor, producedInternal);
     }
 
     private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6e65b89..7d059ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -359,18 +359,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
 
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
-            () -> new KTableSuppressProcessor<>(
-                suppressedInternal,
-                storeName,
-                keySerde,
-                valSerde == null ? null : new FullChangeSerde<>(valSerde)
-            );
+            () -> new KTableSuppressProcessor<>(suppressedInternal, storeName);
 
 
         final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
+            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)),
             false
         );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 686002a..7184d7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -19,8 +19,6 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
@@ -30,7 +28,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
 import static java.util.Objects.requireNonNull;
@@ -44,22 +41,14 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private final boolean safeToDropTombstones;
     private final String storeName;
 
-    private TimeOrderedKeyValueBuffer buffer;
+    private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
     private InternalProcessorContext internalProcessorContext;
     private Sensor suppressionEmitSensor;
-    private Serde<K> keySerde;
-    private FullChangeSerde<V> valueSerde;
-
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
-    public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
-                                   final String storeName,
-                                   final Serde<K> keySerde,
-                                   final FullChangeSerde<V> valueSerde) {
+    public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) {
         this.storeName = storeName;
         requireNonNull(suppress);
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
         maxRecords = suppress.bufferConfig().maxRecords();
         maxBytes = suppress.bufferConfig().maxBytes();
         suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
@@ -74,9 +63,8 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
         internalProcessorContext = (InternalProcessorContext) context;
         suppressionEmitSensor = Sensors.suppressionEmitSensor(internalProcessorContext);
 
-        keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
-        valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
-        buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
+        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName));
+        buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
     }
 
     @Override
@@ -88,12 +76,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
 
     private void buffer(final K key, final Change<V> value) {
         final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
-        final ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
-
-        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key));
-        final byte[] serializedValue = valueSerde.serializer().serialize(null, value);
-
-        buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
+        buffer.put(bufferTime, key, value, internalProcessorContext.recordContext());
     }
 
     private void enforceConstraints() {
@@ -114,6 +97,11 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
                         buffer.numRecords(), maxRecords,
                         buffer.bufferSize(), maxBytes
                     ));
+                default:
+                    throw new UnsupportedOperationException(
+                        "The bufferFullStrategy [" + bufferFullStrategy +
+                            "] is not implemented. This is a bug in Kafka Streams."
+                    );
             }
         }
     }
@@ -122,14 +110,12 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
         return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
     }
 
-    private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
-        final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value());
-        if (shouldForward(value)) {
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) {
+        if (shouldForward(toEmit.value())) {
             final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
-            internalProcessorContext.setRecordContext(toEmit.value.recordContext());
+            internalProcessorContext.setRecordContext(toEmit.recordContext());
             try {
-                final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
-                internalProcessorContext.forward(key, value);
+                internalProcessorContext.forward(toEmit.key(), toEmit.value());
                 suppressionEmitSensor.record();
             } finally {
                 internalProcessorContext.setRecordContext(prevRecordContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index d7fe3e4..654f11b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -147,7 +147,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                                final V value,
                                final To to) {
         final ProcessorNode previousNode = currentNode();
-        final long currentTimestamp = recordContext.timestamp;
+        final long currentTimestamp = recordContext.timestamp();
 
         try {
             toInternal.update(to);
@@ -170,7 +170,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                 forward(child, key, value);
             }
         } finally {
-            recordContext.timestamp = currentTimestamp;
+            recordContext.setTimestamp(currentTimestamp);
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index aacb801..cc512ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -29,11 +29,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    long timestamp;
-    final long offset;
-    final String topic;
-    final int partition;
-    final Headers headers;
+    private long timestamp;
+    private final long offset;
+    private final String topic;
+    private final int partition;
+    private final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
@@ -48,13 +48,6 @@ public class ProcessorRecordContext implements RecordContext {
         this.headers = headers;
     }
 
-    public ProcessorRecordContext(final long timestamp,
-                                  final long offset,
-                                  final int partition,
-                                  final String topic) {
-        this(timestamp, offset, partition, topic, null);
-    }
-
     public void setTimestamp(final long timestamp) {
         this.timestamp = timestamp;
     }
@@ -225,9 +218,13 @@ public class ProcessorRecordContext implements RecordContext {
             Objects.equals(headers, that.headers);
     }
 
+    /**
+     * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable.
+     */
+    @Deprecated
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp, offset, topic, partition, headers);
+        throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections");
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 8e58b16..e11df7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -23,8 +23,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -50,7 +50,7 @@ import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
 
-public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
@@ -63,6 +63,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
     private final String storeName;
     private final boolean loggingEnabled;
 
+    private Serde<K> keySerde;
+    private Serde<V> valueSerde;
+
     private long memBufferSize = 0L;
     private long minTimestamp = Long.MAX_VALUE;
     private RecordCollector collector;
@@ -74,13 +77,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     private int partition;
 
-    public static class Builder implements StoreBuilder<StateStore> {
+    public static class Builder<K, V> implements StoreBuilder<StateStore> {
 
         private final String storeName;
+        private final Serde<K> keySerde;
+        private final Serde<V> valSerde;
         private boolean loggingEnabled = true;
 
-        public Builder(final String storeName) {
+        public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
             this.storeName = storeName;
+            this.keySerde = keySerde;
+            this.valSerde = valSerde;
         }
 
         /**
@@ -119,8 +126,8 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
         }
 
         @Override
-        public StateStore build() {
-            return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled);
+        public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
+            return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde);
         }
 
         @Override
@@ -182,9 +189,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
         }
     }
 
-    private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) {
+    private InMemoryTimeOrderedKeyValueBuffer(final String storeName,
+                                              final boolean loggingEnabled,
+                                              final Serde<K> keySerde,
+                                              final Serde<V> valueSerde) {
         this.storeName = storeName;
         this.loggingEnabled = loggingEnabled;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
     }
 
     @Override
@@ -199,8 +211,15 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
     }
 
     @Override
+    public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
+        this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
+        this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde;
+    }
+
+    @Override
     public void init(final ProcessorContext context, final StateStore root) {
         final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
+
         bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
         bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
 
@@ -359,7 +378,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
-                           final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
+                           final Consumer<Eviction<K, V>> callback) {
         final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
         int evictions = 0;
 
@@ -377,7 +396,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
                             next.getKey().time + "]"
                     );
                 }
-                callback.accept(new KeyValue<>(next.getKey().key, next.getValue()));
+                final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get());
+                final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value());
+                callback.accept(new Eviction<>(key, value, next.getValue().recordContext()));
 
                 delegate.remove();
                 index.remove(next.getKey().key);
@@ -405,13 +426,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa
 
     @Override
     public void put(final long time,
-                    final Bytes key,
-                    final ContextualRecord contextualRecord) {
-        requireNonNull(contextualRecord.value(), "value cannot be null");
-        requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null");
+                    final K key,
+                    final V value,
+                    final ProcessorRecordContext recordContext) {
+        requireNonNull(value, "value cannot be null");
+        requireNonNull(recordContext, "recordContext cannot be null");
+
+        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+        final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
 
-        cleanPut(time, key, contextualRecord);
-        dirtyKeys.add(key);
+        cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext));
+        dirtyKeys.add(serializedKey);
         updateBufferMetrics();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 86a8c1e..ffa1f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -16,17 +16,64 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 
+import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public interface TimeOrderedKeyValueBuffer extends StateStore {
-    void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback);
+public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
+    final class Eviction<K, V> {
+        private final K key;
+        private final V value;
+        private final ProcessorRecordContext recordContext;
 
-    void put(final long time, final Bytes key, final ContextualRecord value);
+        Eviction(final K key, final V value, final ProcessorRecordContext recordContext) {
+            this.key = key;
+            this.value = value;
+            this.recordContext = recordContext;
+        }
+
+        public K key() {
+            return key;
+        }
+
+        public V value() {
+            return value;
+        }
+
+        public ProcessorRecordContext recordContext() {
+            return recordContext;
+        }
+
+        @Override
+        public String toString() {
+            return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final Eviction<?, ?> eviction = (Eviction<?, ?>) o;
+            return Objects.equals(key, eviction.key) &&
+                Objects.equals(value, eviction.value) &&
+                Objects.equals(recordContext, eviction.recordContext);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value, recordContext);
+        }
+    }
+
+    void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde);
+
+    void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback);
+
+    void put(long time, K key, V value, ProcessorRecordContext recordContext);
 
     int numRecords();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 46a4f13..e3f0a41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
@@ -31,21 +37,23 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
@@ -64,7 +72,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-@Category({IntegrationTest.class})
+@Category(IntegrationTest.class)
 public class SuppressionIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
@@ -76,7 +84,7 @@ public class SuppressionIntegrationTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
     private static final int COMMIT_INTERVAL = 100;
 
-    private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
+    private static KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
         return builder
             .table(
                 input,
@@ -90,6 +98,139 @@ public class SuppressionIntegrationTest {
     }
 
     @Test
+    public void shouldUseDefaultSerdes() {
+        final String testId = "-shouldInheritSerdes";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> inputStream = builder.stream(input);
+
+        final KTable<String, String> valueCounts = inputStream
+            .groupByKey()
+            .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")");
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed);
+
+        valueCounts
+            .toStream()
+            .to(outputRaw);
+
+        final Properties streamsConfig = getStreamsConfig(appId);
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            final boolean rawRecords = waitForAnyRecord(outputRaw);
+            final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+            assertThat(rawRecords, Matchers.is(true));
+            assertThat(suppressedRecords, is(true));
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    @Test
+    public void shouldInheritSerdes() {
+        final String testId = "-shouldInheritSerdes";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> inputStream = builder.stream(input);
+
+        // count sets the serde to Long
+        final KTable<String, Long> valueCounts = inputStream
+            .groupByKey()
+            .count();
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed);
+
+        valueCounts
+            .toStream()
+            .to(outputRaw);
+
+        final Properties streamsConfig = getStreamsConfig(appId);
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            final boolean rawRecords = waitForAnyRecord(outputRaw);
+            final boolean suppressedRecords = waitForAnyRecord(outputSuppressed);
+            assertThat(rawRecords, Matchers.is(true));
+            assertThat(suppressedRecords, is(true));
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    private static boolean waitForAnyRecord(final String topic) {
+        final Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+        try (final Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) {
+            final List<TopicPartition> partitions =
+                consumer.partitionsFor(topic)
+                        .stream()
+                        .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+                        .collect(Collectors.toList());
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+            final long start = System.currentTimeMillis();
+            while ((System.currentTimeMillis() - start) < DEFAULT_TIMEOUT) {
+                final ConsumerRecords<Object, Object> records = consumer.poll(ofMillis(500));
+
+                if (!records.isEmpty()) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+    }
+
+    @Test
     public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
         final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -145,7 +286,7 @@ public class SuppressionIntegrationTest {
 
         valueCounts
             // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
             .toStream()
             .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
 
@@ -172,7 +313,7 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    private Properties getStreamsConfig(final String appId) {
+    private static Properties getStreamsConfig(final String appId) {
         return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
@@ -187,11 +328,11 @@ public class SuppressionIntegrationTest {
      * scaling to ensure that there are commits in between the various test events,
      * just to exercise that everything works properly in the presence of commits.
      */
-    private long scaledTime(final long unscaledTime) {
+    private static long scaledTime(final long unscaledTime) {
         return COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
-    private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
+    private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
         final Properties producerConfig = mkProperties(mkMap(
             mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
             mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
@@ -201,7 +342,7 @@ public class SuppressionIntegrationTest {
         IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce);
     }
 
-    private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
+    private static void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
         waitForCondition(() -> !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down.");
         assertThat(driver.state(), is(KafkaStreams.State.ERROR));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
index a6a8888..ddba05e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -29,7 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class FullChangeSerdeTest {
-    private final FullChangeSerde<String> serde = new FullChangeSerde<>(Serdes.String());
+    private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
 
     @Test
     public void shouldRoundTripNull() {
@@ -77,31 +77,28 @@ public class FullChangeSerdeTest {
         );
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureSerde() {
         final Serde<Void> mock = EasyMock.mock(Serde.class);
         mock.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
         serde.configure(emptyMap(), false);
         EasyMock.verify(mock);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseSerde() {
         final Serde<Void> mock = EasyMock.mock(Serde.class);
         mock.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mock);
-        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
         serde.close();
         EasyMock.verify(mock);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureSerializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -111,13 +108,12 @@ public class FullChangeSerdeTest {
         mockSerializer.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
         serializer.configure(emptyMap(), false);
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockSerializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseSerializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -127,13 +123,12 @@ public class FullChangeSerdeTest {
         mockSerializer.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mockSerializer);
-        final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer();
+        final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
         serializer.close();
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockSerializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldConfigureDeserializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -143,13 +138,12 @@ public class FullChangeSerdeTest {
         mockDeserializer.configure(emptyMap(), false);
         EasyMock.expectLastCall();
         EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
         serializer.configure(emptyMap(), false);
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockDeserializer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseDeserializer() {
         final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
@@ -159,7 +153,7 @@ public class FullChangeSerdeTest {
         mockDeserializer.close();
         EasyMock.expectLastCall();
         EasyMock.replay(mockDeserializer);
-        final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer();
+        final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
         serializer.close();
         EasyMock.verify(mockSerde);
         EasyMock.verify(mockDeserializer);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 228cfc8..62ae3bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
@@ -31,8 +32,6 @@ import org.junit.Test;
 import java.time.Duration;
 import java.util.Map;
 
-import static org.apache.kafka.common.serialization.Serdes.Long;
-import static org.apache.kafka.common.serialization.Serdes.String;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
@@ -40,7 +39,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.Is.is;
 
-@SuppressWarnings("PointlessArithmeticExpression")
 public class KTableSuppressProcessorMetricsTest {
     private static final long ARBITRARY_LONG = 5L;
 
@@ -136,16 +134,17 @@ public class KTableSuppressProcessorMetricsTest {
     public void shouldRecordMetrics() {
         final String storeName = "test-store";
 
-        final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+        final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
+            storeName, Serdes.String(),
+            FullChangeSerde.castOrWrap(Serdes.Long())
+        )
             .withLoggingDisabled()
             .build();
 
         final KTableSuppressProcessor<String, Long> processor =
             new KTableSuppressProcessor<>(
                 (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
-                storeName,
-                String(),
-                new FullChangeSerde<>(Long())
+                storeName
             );
 
         final MockInternalProcessorContext context = new MockInternalProcessorContext();
@@ -191,9 +190,9 @@ public class KTableSuppressProcessorMetricsTest {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> void verifyMetric(final Map<MetricName, ? extends Metric> metrics,
-                                  final MetricName metricName,
-                                  final Matcher<T> matcher) {
+    private static <T> void verifyMetric(final Map<MetricName, ? extends Metric> metrics,
+                                         final MetricName metricName,
+                                         final Matcher<T> matcher) {
         assertThat(metrics.get(metricName).metricName().description(), is(metricName.description()));
         assertThat((T) metrics.get(metricName).metricValue(), matcher);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 4b182cb..6c10d91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -72,16 +72,12 @@ public class KTableSuppressProcessorTest {
 
             final String storeName = "test-store";
 
-            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+            final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
                 .withLoggingDisabled()
                 .build();
+
             final KTableSuppressProcessor<K, V> processor =
-                new KTableSuppressProcessor<>(
-                    (SuppressedInternal<K>) suppressed,
-                    storeName,
-                    keySerde,
-                    new FullChangeSerde<>(valueSerde)
-                );
+                new KTableSuppressProcessor<>((SuppressedInternal<K>) suppressed, storeName);
 
             final MockInternalProcessorContext context = new MockInternalProcessorContext();
             context.setCurrentNode(new ProcessorNode("testNode"));
@@ -208,7 +204,6 @@ public class KTableSuppressProcessorTest {
         // note the record is in the past, but the window end is in the future, so we still have to buffer,
         // even though the grace period is 0.
         final long timestamp = 5L;
-        final long streamTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
@@ -446,8 +441,8 @@ public class KTableSuppressProcessorTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
         return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
     }
 
@@ -471,7 +466,7 @@ public class KTableSuppressProcessorTest {
         };
     }
 
-    private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
+    private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
         final Serde<K> kSerde = Serdes.serdeFrom(rawType);
         return new Serdes.WrapperSerde<>(
             new TimeWindowedSerializer<>(kSerde.serializer()),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8afd302..3ae7cf2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -96,7 +96,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
         assertThat(context.topic(), nullValue());
     }
 
@@ -154,7 +154,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null));
         assertThat(context.headers(), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
index ddc4046..18f689f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java
@@ -22,11 +22,11 @@ public class InMemoryTimeOrderedKeyValueBufferTest {
 
     @Test
     public void bufferShouldAllowCacheEnablement() {
-        new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingEnabled();
+        new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled();
     }
 
     @Test
     public void bufferShouldAllowCacheDisablement() {
-        new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingDisabled();
+        new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 2953953..6ae36d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -22,13 +22,14 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -55,7 +56,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
-public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> {
+public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
         new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
@@ -69,9 +70,9 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         return singletonList(
             new Object[] {
                 "in-memory buffer",
-                (Function<String, InMemoryTimeOrderedKeyValueBuffer>) name ->
-                    (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer
-                        .Builder(name)
+                (Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name ->
+                    new InMemoryTimeOrderedKeyValueBuffer
+                        .Builder<>(name, Serdes.String(), Serdes.String())
                         .build()
             }
         );
@@ -96,7 +97,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
     }
 
 
-    private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) {
+    private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer<String, String> buffer) {
         try {
             buffer.close();
             Utils.delete(context.stateDir());
@@ -107,7 +108,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldInit() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         cleanup(context, buffer);
@@ -115,23 +116,23 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldAcceptData() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "2p93nf", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldRejectNullValues() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         try {
-            buffer.put(0, getBytes("asdf"), new ContextualRecord(
-                null,
-                new ProcessorRecordContext(0, 0, 0, "topic")
-            ));
+            buffer.put(0, "asdf",
+                       null,
+                       getContext(0)
+            );
             fail("expected an exception");
         } catch (final NullPointerException expected) {
             // expected
@@ -139,27 +140,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         cleanup(context, buffer);
     }
 
-    private static ContextualRecord getRecord(final String value) {
-        return getRecord(value, 0L);
-    }
-
-    private static ContextualRecord getRecord(final String value, final long timestamp) {
-        return new ContextualRecord(
-            value.getBytes(UTF_8),
-            new ProcessorRecordContext(timestamp, 0, 0, "topic")
-        );
-    }
-
-    private static Bytes getBytes(final String key) {
-        return Bytes.wrap(key.getBytes(UTF_8));
-    }
-
     @Test
     public void shouldRemoveData() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "qwer", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
         assertThat(buffer.numRecords(), is(1));
         buffer.evictWhile(() -> true, kv -> { });
         assertThat(buffer.numRecords(), is(0));
@@ -168,90 +154,71 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldRespectEvictionPredicate() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        final Bytes firstKey = getBytes("asdf");
-        final ContextualRecord firstRecord = getRecord("eyt");
-        putRecord(0, buffer, context, firstRecord, firstKey);
-        putRecord(buffer, context, "rtg", 1, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
+        putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
         assertThat(buffer.numRecords(), is(2));
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
         assertThat(buffer.numRecords(), is(1));
-        assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord))));
+        assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L)))));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackCount() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "oin", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "oin");
         assertThat(buffer.numRecords(), is(1));
-        putRecord(buffer, context, "wekjn", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
         assertThat(buffer.numRecords(), is(1));
-        putRecord(buffer, context, "24inf", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "24inf");
         assertThat(buffer.numRecords(), is(2));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackSize() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "23roni", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
         assertThat(buffer.bufferSize(), is(43L));
-        putRecord(buffer, context, "3l", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "3l");
         assertThat(buffer.bufferSize(), is(39L));
-        putRecord(buffer, context, "qfowin", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
         assertThat(buffer.bufferSize(), is(82L));
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldTrackMinTimestamp() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, "2093j", 1, "asdf");
+        putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
         assertThat(buffer.minTimestamp(), is(1L));
-        putRecord(buffer, context, "3gon4i", 0, "zxcv");
+        putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
         assertThat(buffer.minTimestamp(), is(0L));
         cleanup(context, buffer);
     }
 
-    private static void putRecord(final TimeOrderedKeyValueBuffer buffer,
-                                  final MockInternalProcessorContext context,
-                                  final String value,
-                                  final int time,
-                                  final String key) {
-        putRecord(time, buffer, context, getRecord(value), getBytes(key));
-    }
-
-    private static void putRecord(final int time,
-                                  final TimeOrderedKeyValueBuffer buffer,
-                                  final MockInternalProcessorContext context,
-                                  final ContextualRecord firstRecord,
-                                  final Bytes firstKey) {
-        context.setRecordContext(firstRecord.recordContext());
-        buffer.put(time, firstKey, firstRecord);
-    }
-
     @Test
     public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
-        putRecord(buffer, context, "o23i4", 1, "zxcv");
+        putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
         assertThat(buffer.bufferSize(), is(42L));
         assertThat(buffer.minTimestamp(), is(1L));
 
-        putRecord(buffer, context, "3ng", 0, "asdf");
+        putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.bufferSize(), is(82L));
         assertThat(buffer.minTimestamp(), is(0L));
@@ -260,14 +227,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         buffer.evictWhile(() -> true, kv -> {
             switch (callbackCount.incrementAndGet()) {
                 case 1: {
-                    assertThat(new String(kv.key.get(), UTF_8), is("asdf"));
+                    assertThat(kv.key(), is("asdf"));
                     assertThat(buffer.numRecords(), is(2));
                     assertThat(buffer.bufferSize(), is(82L));
                     assertThat(buffer.minTimestamp(), is(0L));
                     break;
                 }
                 case 2: {
-                    assertThat(new String(kv.key.get(), UTF_8), is("zxcv"));
+                    assertThat(kv.key(), is("zxcv"));
                     assertThat(buffer.numRecords(), is(1));
                     assertThat(buffer.bufferSize(), is(42L));
                     assertThat(buffer.minTimestamp(), is(1L));
@@ -288,12 +255,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldFlush() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf"));
-        putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv"));
-        putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme"));
+        putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
+        putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
+        putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
 
         // replace "deleteme" with a tombstone
         buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { });
@@ -357,17 +324,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         cleanup(context, buffer);
     }
 
-
     @Test
     public void shouldRestoreOldFormat() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
@@ -425,7 +391,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> true, evicted::add);
 
         // Several things to note:
@@ -437,22 +403,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         //   original format.
 
         assertThat(evicted, is(asList(
-            new KeyValue<>(
-                getBytes("zxcv"),
-                new ContextualRecord("3o4im".getBytes(UTF_8),
-                                     new ProcessorRecordContext(2,
-                                                                2,
-                                                                0,
-                                                                "changelog-topic",
-                                                                new RecordHeaders()))),
-            new KeyValue<>(
-                getBytes("asdf"),
-                new ContextualRecord("qwer".getBytes(UTF_8),
-                                     new ProcessorRecordContext(1,
-                                                                1,
-                                                                0,
-                                                                "changelog-topic",
-                                                                new RecordHeaders())))
+            new Eviction<>(
+                "zxcv",
+                "3o4im",
+                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())),
+            new Eviction<>(
+                "asdf",
+                "qwer",
+                new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders()))
         )));
 
         cleanup(context, buffer);
@@ -460,14 +418,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
     @Test
     public void shouldRestoreNewFormat() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
@@ -533,7 +491,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
 
         // flush the buffer into a list in buffer order so we can make assertions about the contents.
 
-        final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>();
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> true, evicted::add);
 
         // Several things to note:
@@ -541,41 +499,33 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
         // * The record timestamps are properly restored, and not conflated with the record's buffer time.
         // * The keys and values are properly restored
         // * The record topic is set to the original input topic, *not* the changelog topic
-        // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record
+        // * The record offset preserves the original input record's offset, *not* the offset of the changelog record
 
 
         assertThat(evicted, is(asList(
-            new KeyValue<>(
-                getBytes("zxcv"),
-                new ContextualRecord("3o4im".getBytes(UTF_8),
-                                     new ProcessorRecordContext(2,
-                                                                0,
-                                                                0,
-                                                                "topic",
-                                                                null))),
-            new KeyValue<>(
-                getBytes("asdf"),
-                new ContextualRecord("qwer".getBytes(UTF_8),
-                                     new ProcessorRecordContext(1,
-                                                                0,
-                                                                0,
-                                                                "topic",
-                                                                null)))
-        )));
+            new Eviction<>(
+                "zxcv",
+                "3o4im",
+                getContext(2L)),
+            new Eviction<>(
+                "asdf",
+                "qwer",
+                getContext(1L)
+            ))));
 
         cleanup(context, buffer);
     }
 
     @Test
     public void shouldNotRestoreUnrecognizedVersionRecord() {
-        final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName);
+        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, ""));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
         final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
 
@@ -601,4 +551,26 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer>
             cleanup(context, buffer);
         }
     }
+
+    private static void putRecord(final TimeOrderedKeyValueBuffer<String, String> buffer,
+                                  final MockInternalProcessorContext context,
+                                  final long streamTime,
+                                  final long recordTimestamp,
+                                  final String key,
+                                  final String value) {
+        final ProcessorRecordContext recordContext = getContext(recordTimestamp);
+        context.setRecordContext(recordContext);
+        buffer.put(streamTime, key, value, recordContext);
+    }
+
+    private static ContextualRecord getRecord(final String value, final long timestamp) {
+        return new ContextualRecord(
+            value.getBytes(UTF_8),
+            getContext(timestamp)
+        );
+    }
+
+    private static ProcessorRecordContext getContext(final long recordTimestamp) {
+        return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
+    }
 }


Mime
View raw message