kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata (#10810)
Date Thu, 10 Jun 2021 05:53:59 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 953ec98  MINOR: Improve Kafka Streams JavaDocs with regard to record metadata (#10810)
953ec98 is described below

commit 953ec9810099d6e5f41541de46c0ceebf4372790
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Wed Jun 9 22:51:36 2021 -0700

    MINOR: Improve Kafka Streams JavaDocs with regard to record metadata (#10810)
    
    Reviewers: Luke Chen <howuon@gmail.com>, Josep Prat <josep.prat@aiven.io>, John Roesler <john@confluent.io>
---
 .../kafka/streams/processor/ProcessorContext.java  | 96 +++++++++++++++-------
 .../apache/kafka/streams/processor/Punctuator.java | 11 +++
 .../kafka/streams/processor/RecordContext.java     | 83 +++++++++++++++----
 .../streams/processor/api/ProcessorContext.java    | 26 +++---
 .../streams/processor/api/RecordMetadata.java      | 41 ++++++++-
 .../internals/ProcessorRecordContext.java          | 77 ++++++++---------
 .../streams/state/internals/LRUCacheEntry.java     |  3 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 23 +++---
 .../internals/KTableKTableInnerJoinTest.java       |  3 +-
 .../internals/KTableKTableLeftJoinTest.java        |  3 +-
 .../internals/KTableKTableOuterJoinTest.java       |  3 +-
 .../internals/KTableKTableRightJoinTest.java       |  3 +-
 .../internals/KTableTransformValuesTest.java       |  2 +-
 .../KTableSuppressProcessorMetricsTest.java        |  7 +-
 .../suppress/KTableSuppressProcessorTest.java      | 47 +++++------
 .../internals/AbstractProcessorContextTest.java    |  2 +-
 .../internals/ProcessorRecordContextTest.java      | 21 ++++-
 .../internals/AbstractWindowBytesStoreTest.java    |  3 +-
 .../streams/state/internals/BufferValueTest.java   | 21 ++---
 .../CachingInMemoryKeyValueStoreTest.java          |  5 +-
 .../internals/CachingInMemorySessionStoreTest.java |  7 +-
 .../CachingPersistentSessionStoreTest.java         |  5 +-
 .../CachingPersistentWindowStoreTest.java          |  5 +-
 .../streams/state/internals/NamedCacheTest.java    | 65 +++++----------
 .../streams/state/internals/ThreadCacheTest.java   | 27 +++---
 .../internals/TimeOrderedKeyValueBufferTest.java   | 14 ++--
 .../streams/processor/MockProcessorContext.java    | 12 +++
 .../kafka/streams/MockProcessorContextTest.java    |  3 +-
 28 files changed, 385 insertions(+), 233 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index aa57463..a598e72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * Processor context interface.
@@ -33,49 +34,49 @@ import java.util.Map;
 public interface ProcessorContext {
 
     /**
-     * Returns the application id.
+     * Return the application id.
      *
      * @return the application id
      */
     String applicationId();
 
     /**
-     * Returns the task id.
+     * Return the task id.
      *
      * @return the task id
      */
     TaskId taskId();
 
     /**
-     * Returns the default key serde.
+     * Return the default key serde.
      *
      * @return the key serializer
      */
     Serde<?> keySerde();
 
     /**
-     * Returns the default value serde.
+     * Return the default value serde.
      *
      * @return the value serializer
      */
     Serde<?> valueSerde();
 
     /**
-     * Returns the state directory for the partition.
+     * Return the state directory for the partition.
      *
      * @return the state directory
      */
     File stateDir();
 
     /**
-     * Returns Metrics instance.
+     * Return Metrics instance.
      *
      * @return StreamsMetrics
      */
     StreamsMetrics metrics();
 
     /**
-     * Registers and possibly restores the specified storage engine.
+     * Register and possibly restores the specified storage engine.
      *
      * @param store the storage engine
      * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
@@ -98,7 +99,7 @@ public interface ProcessorContext {
     <S extends StateStore> S getStateStore(final String name);
 
     /**
-     * Schedules a periodic operation for processors. A processor may call this method during
+     * Schedule a periodic operation for processors. A processor may call this method during
      * {@link Processor#init(ProcessorContext) initialization} or
      * {@link Processor#process(Object, Object) processing} to
      * schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
@@ -134,18 +135,24 @@ public interface ProcessorContext {
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
+     * Forward a key/value pair to all downstream processors.
      * Used the input record's timestamp as timestamp for the output record.
      *
+     * <p> If this method is called with {@link Punctuator#punctuate(long)} the record that
+     * is sent downstream won't have any associated record metadata like topic, partition, or offset.
+     *
      * @param key key
      * @param value value
      */
     <K, V> void forward(final K key, final V value);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
+     * Forward a key/value pair to the specified downstream processors.
      * Can be used to set the timestamp of the output record.
      *
+     * <p> If this method is called with {@link Punctuator#punctuate(long)} the record that
+     * is sent downstream won't have any associated record metadata like topic, partition, or offset.
+     *
      * @param key key
      * @param value value
      * @param to the options to use when forwarding
@@ -153,48 +160,84 @@ public interface ProcessorContext {
     <K, V> void forward(final K key, final V value, final To to);
 
     /**
-     * Requests a commit.
+     * Request a commit.
      */
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Return the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Return the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Return the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Return the headers of the current input record; could be an empty header if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record might not have any associated headers.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide valid headers, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the headers
      */
     Headers headers();
 
     /**
-     * Returns the current timestamp.
+     * Return the current timestamp.
      *
      * <p> If it is triggered while processing a record streamed from the source processor,
      * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
+     * Note, that an upstream {@link Processor} might have set a new timestamp by calling
+     * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}.
+     * In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
+     * to guarantee deterministic results.
      *
      * <p> If it is triggered while processing a record generated not from the source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is defined as the current
@@ -205,7 +248,7 @@ public interface ProcessorContext {
     long timestamp();
 
     /**
-     * Returns all the application config properties as key/value pairs.
+     * Return all the application config properties as key/value pairs.
      *
      * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
      * object and associated to the ProcessorContext.
@@ -220,7 +263,7 @@ public interface ProcessorContext {
     Map<String, Object> appConfigs();
 
     /**
-     * Returns all the application config properties with the given key prefix, as key/value pairs
+     * Return all the application config properties with the given key prefix, as key/value pairs
      * stripping the prefix.
      *
      * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
@@ -234,8 +277,7 @@ public interface ProcessorContext {
     /**
      * Return the current system timestamp (also called wall-clock time) in milliseconds.
      *
-     * <p>
-     * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime.
+     * <p> Note: this method returns the internally cached system timestamp from the Kafka Stream runtime.
      * Thus, it may return a different value compared to {@code System.currentTimeMillis()}.
      *
      * @return the current system timestamp in milliseconds
@@ -245,13 +287,11 @@ public interface ProcessorContext {
     /**
      * Return the current stream-time in milliseconds.
      *
-     * <p>
-     * Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far
+     * <p> Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far
      * (including the currently processed record), i.e., it can be considered a high-watermark.
      * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration.
-     * <p>
      *
-     * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...)
+     * <p> Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...)
      * and {@link StreamsBuilder#addGlobalStore} (...),
      * because there is no concept of stream-time for this case.
      * Calling this method in a global processor will result in an {@link UnsupportedOperationException}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index 1886dad..1cbde6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor;
 
 import java.time.Duration;
+import org.apache.kafka.streams.processor.api.Record;
 
 /**
  * A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
@@ -28,6 +29,16 @@ public interface Punctuator {
     /**
      * Perform the scheduled periodic operation.
      *
+     * <p> If this method accesses {@link ProcessorContext} or
+     * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic,
+     * partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't
+     * be available.
+     *
+     * <p> Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)}
+     * or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
+     * won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used,
+     * it's also not possible to set records headers.
+     *
      * @param timestamp when the operation is being called, depending on {@link PunctuationType}
      */
     void punctuate(long timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
index 5819a46..f0b8ff1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
@@ -17,39 +17,94 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Return the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
-    long offset();
+    String topic();
 
     /**
-     * @return  The timestamp extracted from the record received from Kafka;
-     *          could be -1 if it is not available
+     * Return the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
-    long timestamp();
+    int partition();
 
     /**
-     * @return  The topic the record was received on;
-     *          could be null if it is not available
+     * Return the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
-    String topic();
+    long offset();
 
     /**
-     * @return  The partition the record was received on;
-     *          could be -1 if it is not available
+     * Return the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
+     * Note, that an upstream {@link Processor} might have set a new timestamp by calling
+     * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}.
+     * In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
+     * to guarantee deterministic results.
+     *
+     * <p> If it is triggered while processing a record generated not from the source processor (for example,
+     * if this method is invoked from the punctuate call), timestamp is defined as the current
+     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
+     *
+     * @return the timestamp
      */
-    int partition();
+    long timestamp();
 
     /**
-     * @return  The headers from the record received from Kafka;
-     *          could be null if it is not available
+     * Return the headers of the current input record; could be an empty header if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record might not have any associated headers.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid headers, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the headers
      */
     Headers headers();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index c591d51..d110a76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -39,23 +39,23 @@ import java.util.Optional;
 public interface ProcessorContext<KForward, VForward> {
 
     /**
-     * Returns the application id.
+     * Return the application id.
      *
      * @return the application id
      */
     String applicationId();
 
     /**
-     * Returns the task id.
+     * Return the task id.
      *
      * @return the task id
      */
     TaskId taskId();
 
     /**
-     * The metadata of the source record, if is one. Processors may be invoked to
+     * Return the metadata of the current record if available. Processors may be invoked to
      * process a source record from an input topic, to run a scheduled punctuation
-     * (see {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)} ),
+     * (see {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}),
      * or because a parent processor called {@link ProcessorContext#forward(Record)}.
      * <p>
      * In the case of a punctuation, there is no source record, so this metadata would be
@@ -74,28 +74,28 @@ public interface ProcessorContext<KForward, VForward> {
     Optional<RecordMetadata> recordMetadata();
 
     /**
-     * Returns the default key serde.
+     * Return the default key serde.
      *
      * @return the key serializer
      */
     Serde<?> keySerde();
 
     /**
-     * Returns the default value serde.
+     * Return the default value serde.
      *
      * @return the value serializer
      */
     Serde<?> valueSerde();
 
     /**
-     * Returns the state directory for the partition.
+     * Return the state directory for the partition.
      *
      * @return the state directory
      */
     File stateDir();
 
     /**
-     * Returns Metrics instance.
+     * Return Metrics instance.
      *
      * @return StreamsMetrics
      */
@@ -113,7 +113,7 @@ public interface ProcessorContext<KForward, VForward> {
     <S extends StateStore> S getStateStore(final String name);
 
     /**
-     * Schedules a periodic operation for processors. A processor may call this method during
+     * Schedule a periodic operation for processors. A processor may call this method during
      * {@link Processor#init(ProcessorContext) initialization} or
      * {@link Processor#process(Record)}  processing} to
      * schedule a periodic callback &mdash; called a punctuation &mdash; to {@link Punctuator#punctuate(long)}.
@@ -149,7 +149,7 @@ public interface ProcessorContext<KForward, VForward> {
                          final Punctuator callback);
 
     /**
-     * Forwards a record to all child processors.
+     * Forward a record to all child processors.
      * <p>
      * Note that the forwarded {@link Record} is shared between the parent and child
      * processors. And of course, the parent may forward the same object to multiple children,
@@ -207,7 +207,7 @@ public interface ProcessorContext<KForward, VForward> {
     <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a record to the specified child processor.
+     * Forward a record to the specified child processor.
      * See {@link ProcessorContext#forward(Record)} for considerations.
      *
      * @param record The record to forward
@@ -217,7 +217,7 @@ public interface ProcessorContext<KForward, VForward> {
     <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
-     * Requests a commit.
+     * Request a commit.
      */
     void commit();
 
@@ -237,7 +237,7 @@ public interface ProcessorContext<KForward, VForward> {
     Map<String, Object> appConfigs();
 
     /**
-     * Returns all the application config properties with the given key prefix, as key/value pairs
+     * Return all the application config properties with the given key prefix, as key/value pairs
      * stripping the prefix.
      *
      * <p> The config properties are defined in the {@link org.apache.kafka.streams.StreamsConfig}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
index 532104a..ab88b89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
@@ -16,19 +16,54 @@
  */
 package org.apache.kafka.streams.processor.api;
 
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
 public interface RecordMetadata {
     /**
-     * @return  The topic of the original record received from Kafka
+     * Return the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
     String topic();
 
     /**
-     * @return  The partition of the original record received from Kafka
+     * Return the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
     int partition();
 
     /**
-     * @return  The offset of the original record received from Kafka
+     * Return the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
     long offset();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index 7eb4dc8..07e9ab3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -43,12 +43,11 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);
     }
 
     @Override
@@ -84,13 +83,11 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
             size += topic.toCharArray().length;
         }
         size += Integer.BYTES; // partition
-        if (headers != null) {
-            for (final Header header : headers) {
-                size += header.key().toCharArray().length;
-                final byte[] value = header.value();
-                if (value != null) {
-                    size += value.length;
-                }
+        for (final Header header : headers) {
+            size += header.key().toCharArray().length;
+            final byte[] value = header.value();
+            if (value != null) {
+                size += value.length;
             }
         }
         return size;
@@ -109,26 +106,22 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
         size += Integer.BYTES; // partition
         size += Integer.BYTES; // number of headers
 
-        if (headers == null) {
-            headerKeysBytes = headerValuesBytes = null;
-        } else {
-            final Header[] headers = this.headers.toArray();
-            headerKeysBytes = new byte[headers.length][];
-            headerValuesBytes = new byte[headers.length][];
-
-            for (int i = 0; i < headers.length; i++) {
-                size += 2 * Integer.BYTES; // sizes of key and value
-
-                final byte[] keyBytes = headers[i].key().getBytes(UTF_8);
-                size += keyBytes.length;
-                final byte[] valueBytes = headers[i].value();
-                if (valueBytes != null) {
-                    size += valueBytes.length;
-                }
-
-                headerKeysBytes[i] = keyBytes;
-                headerValuesBytes[i] = valueBytes;
+        final Header[] headers = this.headers.toArray();
+        headerKeysBytes = new byte[headers.length][];
+        headerValuesBytes = new byte[headers.length][];
+
+        for (int i = 0; i < headers.length; i++) {
+            size += 2 * Integer.BYTES; // sizes of key and value
+
+            final byte[] keyBytes = headers[i].key().getBytes(UTF_8);
+            size += keyBytes.length;
+            final byte[] valueBytes = headers[i].value();
+            if (valueBytes != null) {
+                size += valueBytes.length;
             }
+
+            headerKeysBytes[i] = keyBytes;
+            headerValuesBytes[i] = valueBytes;
         }
 
         final ByteBuffer buffer = ByteBuffer.allocate(size);
@@ -140,20 +133,16 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
         buffer.put(topicBytes);
 
         buffer.putInt(partition);
-        if (headers == null) {
-            buffer.putInt(-1);
-        } else {
-            buffer.putInt(headerKeysBytes.length);
-            for (int i = 0; i < headerKeysBytes.length; i++) {
-                buffer.putInt(headerKeysBytes[i].length);
-                buffer.put(headerKeysBytes[i]);
-
-                if (headerValuesBytes[i] != null) {
-                    buffer.putInt(headerValuesBytes[i].length);
-                    buffer.put(headerValuesBytes[i]);
-                } else {
-                    buffer.putInt(-1);
-                }
+        buffer.putInt(headerKeysBytes.length);
+        for (int i = 0; i < headerKeysBytes.length; i++) {
+            buffer.putInt(headerKeysBytes[i].length);
+            buffer.put(headerKeysBytes[i]);
+
+            if (headerValuesBytes[i] != null) {
+                buffer.putInt(headerValuesBytes[i].length);
+                buffer.put(headerValuesBytes[i]);
+            } else {
+                buffer.putInt(-1);
             }
         }
 
@@ -172,8 +161,8 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
         final int partition = buffer.getInt();
         final int headerCount = buffer.getInt();
         final Headers headers;
-        if (headerCount == -1) {
-            headers = null;
+        if (headerCount == -1) { // keep for backward compatibilty
+            headers = new RecordHeaders();
         } else {
             final Header[] headerArr = new Header[headerCount];
             for (int i = 0; i < headerCount; i++) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 0f1a1ac..f4233c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 
 import java.util.Objects;
@@ -31,7 +32,7 @@ class LRUCacheEntry {
 
 
     LRUCacheEntry(final byte[] value) {
-        this(value, null, false, -1, -1, -1, "");
+        this(value, new RecordHeaders(), false, -1, -1, -1, "");
     }
 
     LRUCacheEntry(final byte[] value,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index b2d0977..94bad59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
@@ -380,7 +381,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics() {
         setup(false);
         context.setRecordContext(
-            new ProcessorRecordContext(-1, -2, -3, "topic", null)
+            new ProcessorRecordContext(-1, -2, -3, "topic", new RecordHeaders())
         );
 
         try (final LogCaptureAppender appender =
@@ -413,22 +414,22 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.init(context);
 
         // dummy record to establish stream time = 0
-        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
         processor.process("dummy", "dummy");
 
         // record arrives on time, should not be skipped
-        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
         processor.process("OnTime1", "1");
 
         // dummy record to advance stream time = 1
-        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
         processor.process("dummy", "dummy");
 
         try (final LogCaptureAppender appender =
                  LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
 
             // record is late
-            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("Late1", "1");
 
             assertThat(
@@ -481,27 +482,27 @@ public class KStreamSessionWindowAggregateProcessorTest {
                  LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class)) {
 
             // dummy record to establish stream time = 0
-            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // record arrives on time, should not be skipped
-            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime1", "1");
 
             // dummy record to advance stream time = 1
-            context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // delayed record arrives on time, should not be skipped
-            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("OnTime2", "1");
 
             // dummy record to advance stream time = 2
-            context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", new RecordHeaders()));
             processor.process("dummy", "dummy");
 
             // delayed record arrives late
-            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+            context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", new RecordHeaders()));
             processor.process("Late1", "1");
 
             assertThat(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index c7de7f7..991ee59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -257,7 +258,7 @@ public class KTableKTableInnerJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext(props);
-        context.setRecordMetadata("left", -1, -2, null, -3);
+        context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
         join.init(context);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableInnerJoin.class)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 451725f..0d7f4b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -523,7 +524,7 @@ public class KTableKTableLeftJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext(props);
-        context.setRecordMetadata("left", -1, -2, null, -3);
+        context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
         join.init(context);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableLeftJoin.class)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 40da184..e41d654 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -414,7 +415,7 @@ public class KTableKTableOuterJoinTest {
         ).get();
 
         final MockProcessorContext context = new MockProcessorContext(props);
-        context.setRecordMetadata("left", -1, -2, null, -3);
+        context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
         join.init(context);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableOuterJoin.class)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index c5e211d..8e9ab43 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -48,7 +49,7 @@ public class KTableKTableRightJoinTest {
 
         props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST);
         final MockProcessorContext context = new MockProcessorContext(props);
-        context.setRecordMetadata("left", -1, -2, null, -3);
+        context.setRecordMetadata("left", -1, -2, new RecordHeaders(), -3);
         join.init(context);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KTableKTableRightJoin.class)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 390f408..5ab1512 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -224,7 +224,7 @@ public class KTableTransformValuesTest {
         final ProcessorRecordContext recordContext = new ProcessorRecordContext(
             42L,
             23L,
-            1,
+            -1,
             "foo",
             new RecordHeaders()
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 3789ad2..fa4eada 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -52,7 +53,7 @@ import static org.hamcrest.core.Is.is;
 public class KTableSuppressProcessorMetricsTest {
     private static final long ARBITRARY_LONG = 5L;
     private static final TaskId TASK_ID = new TaskId(0, 0);
-    private Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
+    private final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
     private final String threadId = Thread.currentThread().getName();
 
     private final MetricName evictionTotalMetricLatest = new MetricName(
@@ -151,7 +152,7 @@ public class KTableSuppressProcessorMetricsTest {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final String key = "longKey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -174,7 +175,7 @@ public class KTableSuppressProcessorMetricsTest {
             verifyMetric(metrics, bufferCountMaxMetric, is(1.0));
         }
 
-        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
         processor.process("key", value);
 
         {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 778af9a..4019965 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
@@ -101,7 +102,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = ARBITRARY_LONG;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
         harness.processor.process(key, value);
@@ -119,7 +120,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = ARBITRARY_LONG;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         harness.processor.process(key, value);
@@ -137,13 +138,13 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 0L;
-        context.setRecordMetadata("topic", 0, 0, null, timestamp);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
         harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        context.setRecordMetadata("topic", 0, 1, null, 1L);
+        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), 1L);
         harness.processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
@@ -161,7 +162,7 @@ public class KTableSuppressProcessorTest {
         final long windowStart = 99L;
         final long recordTime = 99L;
         final long windowEnd = 100L;
-        context.setRecordMetadata("topic", 0, 0, null, recordTime);
+        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), recordTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         harness.processor.process(key, value);
@@ -172,7 +173,7 @@ public class KTableSuppressProcessorTest {
         final long windowStart2 = 100L;
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
-        context.setRecordMetadata("topic", 0, 1, null, recordTime2);
+        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime2);
         harness.processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
         assertThat(context.forwarded(), hasSize(0));
 
@@ -180,7 +181,7 @@ public class KTableSuppressProcessorTest {
         final long windowStart3 = 101L;
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
-        context.setRecordMetadata("topic", 0, 1, null, recordTime3);
+        context.setRecordMetadata("topic", 0, 1, new RecordHeaders(), recordTime3);
         harness.processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -204,13 +205,13 @@ public class KTableSuppressProcessorTest {
         // even though the grace period is 0.
         final long timestamp = 5L;
         final long windowEnd = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        context.setRecordMetadata("", 0, 1L, null, windowEnd);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), windowEnd);
         harness.processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -226,7 +227,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         harness.processor.process(key, value);
@@ -248,7 +249,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
@@ -268,7 +269,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
@@ -287,7 +288,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
@@ -310,7 +311,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
@@ -333,7 +334,7 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
@@ -351,12 +352,12 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
 
-        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
         harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -372,12 +373,12 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
 
-        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp + 1);
         harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -393,13 +394,13 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
 
-        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
         try {
             harness.processor.process("dummyKey", value);
             fail("expected an exception");
@@ -415,13 +416,13 @@ public class KTableSuppressProcessorTest {
         final MockInternalProcessorContext context = harness.context;
 
         final long timestamp = 100L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setRecordMetadata("", 0, 0L, new RecordHeaders(), timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         harness.processor.process(key, value);
 
-        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        context.setRecordMetadata("", 0, 1L, new RecordHeaders(), timestamp);
         try {
             harness.processor.process("dummyKey", value);
             fail("expected an exception");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index e4968e6..2e7c04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -92,7 +92,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldNotThrowNullPointerExceptionOnTopicIfRecordContextTopicIsNull() {
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null, null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, null, new RecordHeaders()));
         assertThat(context.topic(), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
index 83ab127..68eb21e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
@@ -21,20 +21,35 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class ProcessorRecordContextTest {
     // timestamp + offset + partition: 8 + 8 + 4
     private final static long MIN_SIZE = 20L;
 
     @Test
-    public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() {
+    public void shouldNotAllowNullHeaders() {
+        assertThrows(
+            NullPointerException.class,
+            () -> new ProcessorRecordContext(
+                42L,
+                73L,
+                0,
+                "topic",
+                null
+            )
+        );
+    }
+
+    @Test
+    public void shouldEstimateNullTopicAndEmptyHeadersAsZeroLength() {
         final Headers headers = new RecordHeaders();
         final ProcessorRecordContext context = new ProcessorRecordContext(
             42L,
             73L,
             0,
             null,
-            null
+            new RecordHeaders()
         );
 
         assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
@@ -60,7 +75,7 @@ public class ProcessorRecordContextTest {
             73L,
             0,
             "topic",
-            null
+            new RecordHeaders()
         );
 
         assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 25fbc2f..2452acf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -1218,6 +1219,6 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     private ProcessorRecordContext createRecordContext(final long time) {
-        return new ProcessorRecordContext(time, 0, 0, "topic", null);
+        return new ProcessorRecordContext(time, 0, 0, "topic", new RecordHeaders());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
index ad9b5f8..a8cc5ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.junit.Test;
 
@@ -81,7 +82,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldAccountForDeduplicationInSizeEstimate() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         assertEquals(25L, new BufferValue(null, null, null, context).residentMemorySizeEstimate());
         assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null, context).residentMemorySizeEstimate());
         assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
@@ -94,7 +95,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldSerializeNulls() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] bytes = new BufferValue(null, null, null, context).serialize(0).array();
         final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
@@ -104,7 +105,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldSerializePrior() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] priorValue = {(byte) 5};
         final byte[] bytes = new BufferValue(priorValue, null, null, context).serialize(0).array();
@@ -115,7 +116,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldSerializeOld() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] oldValue = {(byte) 5};
         final byte[] bytes = new BufferValue(null, oldValue, null, context).serialize(0).array();
@@ -126,7 +127,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldSerializeNew() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] newValue = {(byte) 5};
         final byte[] bytes = new BufferValue(null, null, newValue, context).serialize(0).array();
@@ -137,7 +138,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldCompactDuplicates() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] duplicate = {(byte) 5};
         final byte[] bytes = new BufferValue(duplicate, duplicate, null, context).serialize(0).array();
@@ -148,7 +149,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldDeserializePrior() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] priorValue = {(byte) 5};
         final ByteBuffer serialValue =
@@ -163,7 +164,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldDeserializeOld() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] oldValue = {(byte) 5};
         final ByteBuffer serialValue =
@@ -177,7 +178,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldDeserializeNew() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] newValue = {(byte) 5};
         final ByteBuffer serialValue =
@@ -191,7 +192,7 @@ public class BufferValueTest {
 
     @Test
     public void shouldDeserializeCompactedDuplicates() {
-        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
+        final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", new RecordHeaders());
         final byte[] serializedContext = context.serialize();
         final byte[] duplicate = {(byte) 5};
         final ByteBuffer serialValue =
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index ff78642..e702151 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -75,7 +76,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext<>(null, null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         store.init((StateStoreContext) context, null);
     }
 
@@ -201,7 +202,7 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         store = new CachingKeyValueStore(underlyingStore);
         cache = EasyMock.niceMock(ThreadCache.class);
         context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         store.init((StateStoreContext) context, store);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index 417b35f..67258e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -89,7 +90,7 @@ public class CachingInMemorySessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
@@ -225,7 +226,7 @@ public class CachingInMemorySessionStoreTest {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = EasyMock.niceMock(ThreadCache.class);
         final InternalMockProcessorContext context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
@@ -312,7 +313,7 @@ public class CachingInMemorySessionStoreTest {
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(
             Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)),
             0,
-            added.size() * 10);
+            added.size() * 10L);
         final List<KeyValue<Windowed<Bytes>, byte[]>> actual = toList(iterator);
         verifyKeyValueList(added, actual);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 55018bf..8840da4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -94,7 +95,7 @@ public class CachingPersistentSessionStoreTest {
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         final InternalMockProcessorContext context =
             new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
@@ -211,7 +212,7 @@ public class CachingPersistentSessionStoreTest {
         cache = EasyMock.niceMock(ThreadCache.class);
         final InternalMockProcessorContext context =
             new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index ef9345b..8bdf8b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -104,7 +105,7 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
@@ -907,7 +908,7 @@ public class CachingPersistentWindowStoreTest {
         cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
         cache = EasyMock.createNiceMock(ThreadCache.class);
         context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
-        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null));
+        context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 081ce21..6d43b4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -43,16 +43,12 @@ public class NamedCacheTest {
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";
 
     @Before
     public void setUp() {
-        innerMetrics = new Metrics();
-        metrics = new MockStreamsMetrics(innerMetrics);
-        cache = new NamedCache(taskIDString + "-" + underlyingStoreName, metrics);
+        final Metrics innerMetrics = new Metrics();
+        final StreamsMetricsImpl metrics = new MockStreamsMetrics(innerMetrics);
+        cache = new NamedCache("dummy-name", metrics);
     }
 
     @Test
@@ -63,13 +59,14 @@ public class NamedCacheTest {
                 new KeyValue<>("K3", "V3"),
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
-        for (int i = 0; i < toInsert.size(); i++) {
-            final byte[] key = toInsert.get(i).key.getBytes();
-            final byte[] value = toInsert.get(i).value.getBytes();
-            cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
+        for (final KeyValue<String, String> stringStringKeyValue : toInsert) {
+            final byte[] key = stringStringKeyValue.key.getBytes();
+            final byte[] value = stringStringKeyValue.value.getBytes();
+            cache.put(Bytes.wrap(key),
+                new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, ""));
             final LRUCacheEntry head = cache.first();
             final LRUCacheEntry tail = cache.last();
-            assertEquals(new String(head.value()), toInsert.get(i).value);
+            assertEquals(new String(head.value()), stringStringKeyValue.value);
             assertEquals(new String(tail.value()), toInsert.get(0).value);
             assertEquals(cache.flushes(), 0);
             assertEquals(cache.hits(), 0);
@@ -158,12 +155,7 @@ public class NamedCacheTest {
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
         cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, ""));
 
-        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                flushed.addAll(dirty);
-            }
-        });
+        cache.setListener(flushed::addAll);
 
         cache.evict();
 
@@ -185,19 +177,14 @@ public class NamedCacheTest {
     public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() {
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
         assertThrows(IllegalStateException.class, () -> cache.put(Bytes.wrap(new byte[]{0}),
-            new LRUCacheEntry(new byte[]{10}, null, false, 0, 0, 0, "")));
+            new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0, 0, 0, "")));
     }
 
     @Test
     public void shouldRemoveDeletedValuesOnFlush() {
-        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                // no-op
-            }
-        });
+        cache.setListener(dirty -> { /* no-op */ });
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, null, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, new RecordHeaders(), true, 0, 0, 0, ""));
         cache.flush();
         assertEquals(1, cache.size());
         assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@@ -205,21 +192,18 @@ public class NamedCacheTest {
 
     @Test
     public void shouldBeReentrantAndNotBreakLRU() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         cache.put(Bytes.wrap(new byte[]{0}), dirty);
         cache.put(Bytes.wrap(new byte[]{1}), clean);
         cache.put(Bytes.wrap(new byte[]{2}), clean);
         assertEquals(3 * cache.head().size(), cache.sizeInBytes());
-        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                cache.put(Bytes.wrap(new byte[]{3}), clean);
-                // evict key 1
-                cache.evict();
-                // evict key 2
-                cache.evict();
-            }
+        cache.setListener(dirty1 -> {
+            cache.put(Bytes.wrap(new byte[]{3}), clean);
+            // evict key 1
+            cache.evict();
+            // evict key 2
+            cache.evict();
         });
 
         assertEquals(3 * cache.head().size(), cache.sizeInBytes());
@@ -251,15 +235,10 @@ public class NamedCacheTest {
 
     @Test
     public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() {
-        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, null, true, 0, 0, 0, "");
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
         final Bytes key = Bytes.wrap(new byte[] {3});
-        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                cache.put(key, clean);
-            }
-        });
+        cache.setListener(dirty1 -> cache.put(key, clean));
         cache.put(key, dirty);
         cache.evict();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index c449de9..805d295 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
@@ -63,7 +64,7 @@ public class ThreadCacheTest {
         for (final KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, ""));
         }
 
         for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -96,7 +97,7 @@ public class ThreadCacheTest {
             final String keyStr = "K" + i;
             final Bytes key = Bytes.wrap(keyStr.getBytes());
             final byte[] value = new byte[valueSizeBytes];
-            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, ""));
         }
 
 
@@ -134,7 +135,7 @@ public class ThreadCacheTest {
     }
 
 
-    static int memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) {
+    static long memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) {
         return key.length +
             value.length +
             1 + // isDirty
@@ -174,7 +175,7 @@ public class ThreadCacheTest {
         for (final KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             final byte[] value = kvToInsert.value.getBytes();
-            cache.put(namespace, key, new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, ""));
         }
 
         for (int i = 0; i < expected.size(); i++) {
@@ -363,8 +364,8 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
-        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+        final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5L, false);
         assertEquals(5, cache.size());
         // should evict byte[] {0}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
@@ -374,8 +375,8 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
-        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+        final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5L, true);
         assertEquals(5, cache.size());
         // should evict byte[] {4}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
@@ -415,8 +416,8 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldReturnAllUnevictedValuesFromCache() {
-        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+        final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5L, false);
         assertEquals(5, cache.size());
         // should evict byte[] {0}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
@@ -426,8 +427,8 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
-        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+        final long entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5L, true);
         assertEquals(5, cache.size());
         // should evict byte[] {4}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
@@ -615,7 +616,7 @@ public class ThreadCacheTest {
     }
 
     private LRUCacheEntry dirtyEntry(final byte[] key) {
-        return new LRUCacheEntry(key, null, true, -1, -1, -1, "");
+        return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1, "");
     }
 
     private LRUCacheEntry cleanEntry(final byte[] key) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 7d420c7..26e9488 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -370,7 +370,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         // These serialized formats were captured by running version 2.1 code.
         // They verify that an upgrade from 2.1 will work.
@@ -489,7 +489,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
@@ -611,7 +611,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
@@ -735,7 +735,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
 
@@ -856,7 +856,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 3})});
 
@@ -977,7 +977,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
 
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", new RecordHeaders()));
 
         final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
 
@@ -1025,7 +1025,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
     }
 
     private static ProcessorRecordContext getContext(final long recordTimestamp) {
-        return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
+        return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", new RecordHeaders());
     }
 
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index b5ced0f..577f94f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -423,6 +423,18 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return offset;
     }
 
+    /**
+     * Returns the headers of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> Note, that headers should never be {@code null} in the actual Kafka Streams runtime,
+     * even if they could be empty. However, this mock does not guarantee non-{@code null} headers.
+     * Thus, you either need to add a {@code null} check to your production code to use this mock
+     * for testing or you always need to set headers manually via {@link #setHeaders(Headers)} to
+     * avoid a {@link NullPointerException} from your {@link Processor} implementation.
+     *
+     * @return the headers
+     */
     @Override
     public Headers headers() {
         return headers;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index ae345ea..6e3d931 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.MockProcessorContext;
@@ -253,7 +254,7 @@ public class MockProcessorContextTest {
         }
 
         context.resetForwards();
-        context.setRecordMetadata("t1", 0, 0L, null, 0L);
+        context.setRecordMetadata("t1", 0, 0L, new RecordHeaders(), 0L);
 
         {
             processor.process("foo", 5L);

Mime
View raw message