kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6454: Allow timestamp manipulation in Processor API (#4519)
Date Fri, 16 Mar 2018 23:02:20 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 394aa74  KAFKA-6454: Allow timestamp manipulation in Processor API (#4519)
394aa74 is described below

commit 394aa7426117d0d04666c1c2a63d5f98229b7894
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Mar 16 16:02:11 2018 -0700

    KAFKA-6454: Allow timestamp manipulation in Processor API (#4519)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 docs/streams/core-concepts.html                    |   4 +
 docs/streams/developer-guide/processor-api.html    |   4 +
 docs/streams/upgrade-guide.html                    |   8 +
 .../apache/kafka/streams/kstream/Transformer.java  |  16 +-
 .../kafka/streams/kstream/ValueTransformer.java    |  20 ++-
 .../streams/kstream/ValueTransformerWithKey.java   |  15 +-
 .../streams/kstream/internals/KStreamBranch.java   |  15 +-
 .../streams/kstream/internals/KStreamImpl.java     |  15 +-
 .../kstream/internals/KStreamTransformValues.java  |   8 +
 .../kafka/streams/processor/AbstractProcessor.java |   6 +-
 .../kafka/streams/processor/ProcessorContext.java  |  39 +++--
 .../org/apache/kafka/streams/processor/To.java     |  68 +++++++++
 .../internals/AbstractProcessorContext.java        |  15 --
 .../internals/GlobalProcessorContextImpl.java      |  30 +++-
 .../processor/internals/ProcessorContextImpl.java  |  67 ++++++---
 .../streams/processor/internals/ProcessorNode.java |  11 +-
 .../internals/ProcessorRecordContext.java          |   6 +-
 .../streams/processor/internals/RecordContext.java |   5 +
 .../processor/internals/StandbyContextImpl.java    |  11 ++
 .../{RecordContext.java => ToInternal.java}        |  42 +++---
 .../streams/state/internals/LRUCacheEntry.java     |   7 +-
 .../internals/KStreamTransformValuesTest.java      |  17 ++-
 .../internals/AbstractProcessorContextTest.java    |  22 +--
 .../processor/internals/ProcessorTopologyTest.java | 163 +++++++++++++--------
 .../processor/internals/RecordContextStub.java     |  13 +-
 .../apache/kafka/test/MockProcessorContext.java    |  46 +++---
 .../apache/kafka/test/NoOpProcessorContext.java    |   6 +
 27 files changed, 459 insertions(+), 220 deletions(-)

diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index 2f22be7..889fe06 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -127,6 +127,10 @@
         <li> For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.</li>
     </ul>
 
+    <p>
+	Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling <code>#forward()</code>.
+    </p>
+
     <h3><a id="streams_state" href="#streams_state">States</a></h3>
 
     <p>
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index fdf6c86..b51bc22 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -77,6 +77,10 @@
                 its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
                 function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
                 and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).</p>
+            <p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
+	       (1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
+	       (2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
+	       Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
             <p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
                 API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
                 for the punctuation scheduling: either <a class="reference internal" href="../concepts.html#streams-concepts-time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 46be969..baf9633 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -85,6 +85,14 @@
         In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
         to let users specify inner serdes if the default serde classes are windowed serdes.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
+    /<p>
+    
+    <p>
+      Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
+      To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
+      The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
+      The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
+      Forwarding based on child index is not supported in the new API any longer.
     </p>
 
     <h3><a id="streams_api_changes_110" href="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 308fcad..a83b4a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output
@@ -69,9 +70,8 @@ public interface Transformer<K, V, R> {
      * attached} to this operator can be accessed and modified
      * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object, Object)}
+     * and {@link ProcessorContext#forward(Object, Object, To)} can be used.
      * If not record should be forwarded downstream, {@code transform} can return {@code null}.
      *
      * @param key the key for the record
@@ -86,9 +86,8 @@ public interface Transformer<K, V, R> {
      * {@link ProcessorContext#schedule(long) schedules itself} with the context during
      * {@link #init(ProcessorContext) initialization}.
      * <p>
-     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+     * {@link ProcessorContext#forward(Object, Object, To)} can be used.
      * <p>
      * Note that {@code punctuate} is called based on <it>stream time</it> (i.e., time progresses with regard to
      * timestamps return by the used {@link TimestampExtractor})
@@ -105,9 +104,8 @@ public interface Transformer<K, V, R> {
     /**
      * Close this processor and clean up any resources.
      * <p>
-     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} can be used.
+     * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and
+     * {@link ProcessorContext#forward(Object, Object, To)} can be used.
      */
     void close();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1802a61..1da779e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
@@ -58,9 +59,8 @@ public interface ValueTransformer<V, VR> {
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
      * <p>
-     * Note that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, or
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
      * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
@@ -75,9 +75,8 @@ public interface ValueTransformer<V, VR> {
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
      *
      * @param value the value to be transformed
@@ -90,9 +89,8 @@ public interface ValueTransformer<V, VR> {
      * the context during {@link #init(ProcessorContext) initialization}.
      * <p>
      * It is not possible to return any new output records within {@code punctuate}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an
-     * {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+     * will result in an {@link StreamsException exception}.
      * Furthermore, {@code punctuate} must return {@code null}.
      * <p>
      * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e., time progress with regard to
@@ -111,8 +109,8 @@ public interface ValueTransformer<V, VR> {
      * Close this processor and clean up any resources.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)}
+     * will result in an {@link StreamsException exception}.
      */
     void close();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 128c61f..7f399b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.To;
 
 /**
  * The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
@@ -62,9 +63,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
      * <p>
-     * Note that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, or
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within any method of
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within any method of
      * {@code ValueTransformerWithKey} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
@@ -79,9 +79,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)},
-     * {@link ProcessorContext#forward(Object, Object, int)}, and
-     * {@link ProcessorContext#forward(Object, Object, String)} is not allowed within {@code transform} and
+     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
      *
      * @param readOnlyKey the read-only key
@@ -94,8 +93,8 @@ public interface ValueTransformerWithKey<K, V, VR> {
      * Close this processor and clean up any resources.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
-     * Using {@link ProcessorContext#forward(Object, Object)}, {@link ProcessorContext#forward(Object, Object, int)},
-     * or {@link ProcessorContext#forward(Object, Object, String)} will result in an {@link StreamsException exception}.
+     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)},
+     * will result in an {@link StreamsException exception}.
      */
     void close();
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
index 317c5bf..baa9b63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
@@ -16,18 +16,21 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.To;
 
 class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
 
     private final Predicate<K, V>[] predicates;
+    private final String[] childNodes;
 
-    @SuppressWarnings("unchecked")
-    public KStreamBranch(Predicate<K, V> ... predicates) {
+    KStreamBranch(final Predicate<K, V>[] predicates,
+                  final String[] childNodes) {
         this.predicates = predicates;
+        this.childNodes = childNodes;
     }
 
     @Override
@@ -37,12 +40,12 @@ class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
 
     private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K key, V value) {
+        public void process(final K key, final V value) {
             for (int i = 0; i < predicates.length; i++) {
                 if (predicates[i].test(key, value)) {
-                    // use forward with childIndex here and then break the loop
+                    // use forward with child here and then break the loop
                     // so that no record is going to be piped to multiple streams
-                    context().forward(key, value, i);
+                    context().forward(key, value, To.child(childNodes[i]));
                     break;
                 }
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 07bc67d..349be86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -358,17 +358,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         for (final Predicate<? super K, ? super V> predicate : predicates) {
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
-        String branchName = builder.newProcessorName(BRANCH_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+        String branchName = builder.newProcessorName(BRANCH_NAME);
 
-        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        String[] childNames = new String[predicates.length];
         for (int i = 0; i < predicates.length; i++) {
-            String childName = builder.newProcessorName(BRANCHCHILD_NAME);
+            childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
+        }
 
-            builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
+        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name);
 
-            branchChildren[i] = new KStreamImpl<>(builder, childName, sourceNodes, this.repartitionRequired);
+        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        for (int i = 0; i < predicates.length; i++) {
+            builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName);
+            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ace4f69..e644597 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 
 import java.io.File;
 import java.util.Map;
@@ -117,10 +118,17 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                     }
 
                     @Override
+                    public <K, V> void forward(final K key, final V value, final To to) {
+                        throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
+                    }
+
+                    @SuppressWarnings("deprecation")
+                    @Override
                     public <K, V> void forward(final K key, final V value, final int childIndex) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
                     }
 
+                    @SuppressWarnings("deprecation")
                     @Override
                     public <K, V> void forward(final K key, final V value, final String childName) {
                         throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 1cfe78a..14e6c2a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -31,7 +31,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(final ProcessorContext context) {
         this.context = context;
     }
 
@@ -46,7 +46,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
      */
     @SuppressWarnings("deprecation")
     @Override
-    public void punctuate(long timestamp) {
+    public void punctuate(final long timestamp) {
         // do nothing
     }
 
@@ -67,6 +67,6 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
      * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
      */
     protected final ProcessorContext context() {
-        return this.context;
+        return context;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 42902a8..404b225 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -83,7 +83,9 @@ public interface ProcessorContext {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
+    void register(final StateStore store,
+                  final boolean loggingEnabledIsDeprecatedAndIgnored,
+                  final StateRestoreCallback stateRestoreCallback);
 
     /**
      * Get the state store given the store name.
@@ -91,7 +93,7 @@ public interface ProcessorContext {
      * @param name The store name
      * @return The state store instance
      */
-    StateStore getStateStore(String name);
+    StateStore getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
@@ -125,7 +127,9 @@ public interface ProcessorContext {
      * @param callback a function consuming timestamps representing the current stream or system time
      * @return a handle allowing cancellation of the punctuation schedule established by this method
      */
-    Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
+    Cancellable schedule(final long intervalMs,
+                         final PunctuationType type,
+                         final Punctuator callback);
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
@@ -137,30 +141,47 @@ public interface ProcessorContext {
      * @param interval the time interval between punctuations
      */
     @Deprecated
-    void schedule(long interval);
+    void schedule(final long interval);
 
     /**
-     * Forwards a key/value pair to the downstream processors
+     * Forwards a key/value pair to all downstream processors.
+     * Used the input record's timestamp as timestamp for the output record.
+     *
      * @param key key
      * @param value value
      */
-    <K, V> void forward(K key, V value);
+    <K, V> void forward(final K key, final V value);
+
+    /**
+     * Forwards a key/value pair to the specified downstream processors.
+     * Can be used to set the timestamp of the output record.
+     *
+     * @param key key
+     * @param value value
+     * @param to the options to use when forwarding
+     */
+    <K, V> void forward(final K key, final V value, final To to);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by childIndex
      * @param key key
      * @param value value
      * @param childIndex index in list of children of this node
+     * @deprecated please use {@link #forward(Object, Object, To)} instead
      */
-    <K, V> void forward(K key, V value, int childIndex);
+    // TODO when we remove this method, we can also remove `ProcessorNode#children`
+    @Deprecated
+    <K, V> void forward(final K key, final V value, final int childIndex);
 
     /**
      * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
      * @param key key
      * @param value value
      * @param childName name of downstream processor
+     * @deprecated please use {@link #forward(Object, Object, To)} instead
      */
-    <K, V> void forward(K key, V value, String childName);
+    @Deprecated
+    <K, V> void forward(final K key, final V value, final String childName);
 
     /**
      * Requests a commit
@@ -231,6 +252,6 @@ public interface ProcessorContext {
      * @return the key/values matching the given prefix from the StreamsConfig properties.
      *
      */
-    Map<String, Object> appConfigsWithPrefix(String prefix);
+    Map<String, Object> appConfigsWithPrefix(final String prefix);
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/To.java b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
new file mode 100644
index 0000000..52007df
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/To.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * This class is used to provide the optional parameters when sending output records to downstream processor
+ * using {@link ProcessorContext#forward(Object, Object, To)}.
+ */
+public class To {
+    protected String childName;
+    protected long timestamp;
+
+    private To(final String childName,
+               final long timestamp) {
+        this.childName = childName;
+        this.timestamp = timestamp;
+    }
+
+    protected To(final To to) {
+        this(to.childName, to.timestamp);
+    }
+
+    protected void update(final To to) {
+        childName = to.childName;
+        timestamp = to.timestamp;
+    }
+
+    /**
+     * Forward the key/value pair to one of the downstream processors designated by the downstream processor name.
+     * @param childName name of downstream processor
+     * @return a new {@link To} instance configured with {@code childName}
+     */
+    public static To child(final String childName) {
+        return new To(childName, -1);
+    }
+
+    /**
+     * Forward the key/value pair to all downstream processors
+     * @return a new {@link To} instance configured for all downstream processor
+     */
+    public static To all() {
+        return new To((String) null, -1);
+    }
+
+    /**
+     * Set the timestamp of the output record.
+     * @param timestamp the output record timestamp
+     * @return itself (i.e., {@code this})
+     */
+    public To withTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index e9b5a4c..87408c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -164,20 +163,6 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         return combined;
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode previousNode = currentNode();
-        try {
-            for (final ProcessorNode child : (List<ProcessorNode>) currentNode().children()) {
-                setCurrentNode(child);
-                child.process(key, value);
-            }
-        } finally {
-            setCurrentNode(previousNode);
-        }
-    }
-
     @Override
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return config.originalsWithPrefix(prefix);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 37e7cb5..88d9f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -23,8 +23,11 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.util.List;
+
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
 
@@ -40,20 +43,43 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         return stateManager.getGlobalStore(name);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        final ProcessorNode previousNode = currentNode();
+        try {
+            for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
+                setCurrentNode(child);
+                child.process(key, value);
+            }
+        } finally {
+            setCurrentNode(previousNode);
+        }
+    }
+
     /**
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public <K, V> void forward(K key, V value, int childIndex) {
+    public <K, V> void forward(final K key, final V value, final To to) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @SuppressWarnings("deprecation")
+    @Override
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
+    }
 
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
-    public <K, V> void forward(K key, V value, String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 42d3d70..3761bfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -18,12 +18,13 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
@@ -32,6 +33,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     private final StreamTask task;
     private final RecordCollector collector;
+    private final ToInternal toInternal = new ToInternal();
+    private final static To SEND_TO_ALL = To.all();
 
     ProcessorContextImpl(final TaskId id,
                          final StreamTask task,
@@ -77,32 +80,60 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @SuppressWarnings("unchecked")
     @Override
+    public <K, V> void forward(final K key, final V value) {
+        forward(key, value, SEND_TO_ALL);
+    }
+
+    @SuppressWarnings({"unchecked", "deprecation"})
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
+        forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
+    }
+
+    @SuppressWarnings({"unchecked", "deprecation"})
+    @Override
+    public <K, V> void forward(final K key, final V value, final String childName) {
+        forward(key, value, To.child(childName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        toInternal.update(to);
+        if (toInternal.hasTimestamp()) {
+            recordContext.setTimestamp(toInternal.timestamp());
+        }
         final ProcessorNode previousNode = currentNode();
-        final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex);
-        setCurrentNode(child);
         try {
-            child.process(key, value);
+            final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
+            final String sendTo = toInternal.child();
+            if (sendTo != null) {
+                final ProcessorNode child = currentNode().getChild(sendTo);
+                if (child == null) {
+                    throw new StreamsException("Unknown processor name: " + sendTo);
+                }
+                forward(child, key, value);
+            } else {
+                if (children.size() == 1) {
+                    final ProcessorNode child = children.get(0);
+                    forward(child, key, value);
+                } else {
+                    for (final ProcessorNode child : children) {
+                        forward(child, key, value);
+                    }
+                }
+            }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
     @SuppressWarnings("unchecked")
-    @Override
-    public <K, V> void forward(final K key, final V value, final String childName) {
-        for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) {
-            if (child.name().equals(childName)) {
-                ProcessorNode previousNode = currentNode();
-                setCurrentNode(child);
-                try {
-                    child.process(key, value);
-                    return;
-                } finally {
-                    setCurrentNode(previousNode);
-                }
-            }
-        }
+    private <K, V> void forward(final ProcessorNode child,
+                                final K key,
+                                final V value) {
+        setCurrentNode(child);
+        child.process(key, value);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 29f442f..94e8640 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -26,12 +26,16 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class ProcessorNode<K, V> {
 
+    // TODO: 'children' can be removed when #forward() via index is removed
     private final List<ProcessorNode<?, ?>> children;
+    private final Map<String, ProcessorNode<?, ?>> childByName;
 
     private final String name;
     private final Processor<K, V> processor;
@@ -75,6 +79,7 @@ public class ProcessorNode<K, V> {
         this.name = name;
         this.processor = processor;
         this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
         this.stateStores = stateStores;
         this.time = new SystemTime();
     }
@@ -92,11 +97,15 @@ public class ProcessorNode<K, V> {
         return children;
     }
 
+    public final ProcessorNode getChild(final String childName) {
+        return childByName.get(childName);
+    }
+
     public void addChild(ProcessorNode<?, ?> child) {
         children.add(child);
+        childByName.put(child.name, child);
     }
 
-
     public void init(ProcessorContext context) {
         this.context = context;
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index aa20103..92acfc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -20,7 +20,7 @@ import java.util.Objects;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    private final long timestamp;
+    private long timestamp;
     private final long offset;
     private final String topic;
     private final int partition;
@@ -44,6 +44,10 @@ public class ProcessorRecordContext implements RecordContext {
         return timestamp;
     }
 
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
     @Override
     public String topic() {
         return topic;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
index dc752cb..dd58f4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
@@ -34,6 +34,11 @@ public interface RecordContext {
     long timestamp();
 
     /**
+     * Sets a new timestamp for the output record.
+     */
+    void setTimestamp(final long timestamp);
+
+    /**
      * @return The topic the record was received on
      */
     String topic();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e38b821..360c4ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.Collections;
@@ -142,6 +143,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
+    }
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @SuppressWarnings("deprecation")
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
@@ -149,6 +159,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
     public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
similarity index 59%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
index dc752cb..6c5798e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
@@ -16,30 +16,26 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.To;
 
-/**
- * The context associated with the current record being processed by
- * an {@link Processor}
- */
-public interface RecordContext {
-    /**
-     * @return The offset of the original record received from Kafka
-     */
-    long offset();
+public class ToInternal extends To {
+    public ToInternal() {
+        super(To.all());
+    }
+
+    public void update(final To to) {
+        super.update(to);
+    }
 
-    /**
-     * @return The timestamp extracted from the record received from Kafka
-     */
-    long timestamp();
+    public boolean hasTimestamp() {
+        return timestamp != -1;
+    }
 
-    /**
-     * @return The topic the record was received on
-     */
-    String topic();
+    public long timestamp() {
+        return timestamp;
+    }
 
-    /**
-     * @return The partition the record was received on
-     */
-    int partition();
-}
+    public String child() {
+        return childName;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index dedb906..af7059b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -27,7 +27,7 @@ class LRUCacheEntry implements RecordContext {
     private final long offset;
     private final String topic;
     private final int partition;
-    private final long timestamp;
+    private long timestamp;
 
     private long sizeBytes;
     private boolean isDirty;
@@ -64,6 +64,11 @@ class LRUCacheEntry implements RecordContext {
     }
 
     @Override
+    public void setTimestamp(final long timestamp) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public String topic() {
         return topic;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 1b34fab..dc0b886 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -23,18 +23,19 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
 
 public class KStreamTransformValuesTest {
 
@@ -192,6 +193,13 @@ public class KStreamTransformValuesTest {
         }
 
         try {
+            transformValueProcessor.process(null, 3);
+            fail("should not allow call to context.forward() within ValueTransformer");
+        } catch (final StreamsException e) {
+            // expected
+        }
+
+        try {
             transformValueProcessor.punctuate(0);
             fail("should not allow ValueTransformer#puntuate() to return not-null value");
         } catch (final StreamsException e) {
@@ -213,11 +221,14 @@ public class KStreamTransformValuesTest {
                 context.forward(null, null);
             }
             if (value == 1) {
-                context.forward(null, null, null);
+                context.forward(null, null, (String) null);
             }
             if (value == 2) {
                 context.forward(null, null, 0);
             }
+            if (value == 3) {
+                context.forward(null, null, To.all());
+            }
             throw new RuntimeException("Should never happen in this test");
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 46c23c6..aac275d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockStateStore;
@@ -176,28 +177,21 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
-        public void schedule(final long interval) {
-
-        }
+        public void schedule(final long interval) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value) {
-
-        }
+        public <K, V> void forward(final K key, final V value) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value, final int childIndex) {
-
-        }
+        public <K, V> void forward(final K key, final V value, final To to) {}
 
         @Override
-        public <K, V> void forward(final K key, final V value, final String childName) {
-
-        }
+        public <K, V> void forward(final K key, final V value, final int childIndex) {}
 
         @Override
-        public void commit() {
+        public <K, V> void forward(final K key, final V value, final String childName) {}
 
-        }
+        @Override
+        public void commit() {}
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index da2e5dc..d07274a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -72,8 +73,8 @@ public class ProcessorTopologyTest {
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
-        File localState = TestUtils.tempDirectory();
-        Properties props = new Properties();
+        final File localState = TestUtils.tempDirectory();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
@@ -120,8 +121,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingSimpleTopology() throws Exception {
-        int partition = 10;
+    public void testDrivingSimpleTopology() {
+        final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -142,7 +143,7 @@ public class ProcessorTopologyTest {
 
 
     @Test
-    public void testDrivingMultiplexingTopology() throws Exception {
+    public void testDrivingMultiplexingTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -164,7 +165,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingMultiplexByNameTopology() throws Exception {
+    public void testDrivingMultiplexByNameTopology() {
         driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
@@ -186,8 +187,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingStatefulTopology() throws Exception {
-        String storeName = "entries";
+    public void testDrivingStatefulTopology() {
+        final String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -195,7 +196,7 @@ public class ProcessorTopologyTest {
         driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_1);
 
-        KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
+        final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
         assertEquals("value4", store.get("key1"));
         assertEquals("value2", store.get("key2"));
         assertEquals("value3", store.get("key3"));
@@ -205,15 +206,16 @@ public class ProcessorTopologyTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldDriveGlobalStore() {
-        final StateStoreSupplier storeSupplier = Stores.create("my-store")
+        final String storeName = "my-store";
+        final StateStoreSupplier storeSupplier = Stores.create(storeName)
                 .withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
         final String topic = "topic";
         final TopologyBuilder topologyBuilder = this.builder
-                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+                .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
 
         driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder);
-        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store");
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get(storeName);
         driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
         assertEquals("value1", globalStore.get("key1"));
@@ -221,8 +223,8 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingSimpleMultiSourceTopology() throws Exception {
-        int partition = 10;
+    public void testDrivingSimpleMultiSourceTopology() {
+        final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder);
 
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -235,7 +237,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingForwardToSourceTopology() throws Exception {
+    public void testDrivingForwardToSourceTopology() {
         driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -246,7 +248,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningTopology() throws Exception {
+    public void testDrivingInternalRepartitioningTopology() {
         driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -257,7 +259,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception {
+    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
         driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -315,7 +317,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void shouldConsiderTimeStamps() throws Exception {
+    public void shouldConsiderTimeStamps() {
         final int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder);
         driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
@@ -326,6 +328,17 @@ public class ProcessorTopologyTest {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 30L);
     }
 
+    @Test
+    public void shouldConsiderModifiedTimeStamps() {
+        final int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, createTimestampTopology(partition).internalTopologyBuilder);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER, 10L);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER, 20L);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition, 40L);
+    }
 
     private void assertNextOutputRecord(final String topic,
                                         final String key,
@@ -345,7 +358,7 @@ public class ProcessorTopologyTest {
                                         final String value,
                                         final Integer partition,
                                         final Long timestamp) {
-        ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
+        final ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
         assertEquals(value, record.value());
@@ -353,51 +366,63 @@ public class ProcessorTopologyTest {
         assertEquals(timestamp, record.timestamp());
     }
 
-    private void assertNoOutputRecord(String topic) {
+    private void assertNoOutputRecord(final String topic) {
         assertNull(driver.readOutput(topic));
     }
 
     private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
         return new StreamPartitioner<Object, Object>() {
             @Override
-            public Integer partition(Object key, Object value, int numPartitions) {
+            public Integer partition(final Object key, final Object value, final int numPartitions) {
                 return partition;
             }
         };
     }
 
-    private TopologyBuilder createSimpleTopology(int partition) {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new ForwardingProcessor()), "source")
-                                    .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+    private TopologyBuilder createSimpleTopology(final int partition) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new ForwardingProcessor()), "source")
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
+    }
+
+    private TopologyBuilder createTimestampTopology(final int partition) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new TimestampProcessor()), "source")
+            .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
     private TopologyBuilder createMultiplexingTopology() {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
-                                    .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-                                    .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
+            .addSink("sink1", OUTPUT_TOPIC_1, "processor")
+            .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
     private TopologyBuilder createMultiplexByNameTopology() {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
-    private TopologyBuilder createStatefulTopology(String storeName) {
-        return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
-                                    .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
-                                    .addStateStore(
-                                            Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
-                                            "processor"
-                                    )
-                                    .addSink("counts", OUTPUT_TOPIC_1, "processor");
+    private TopologyBuilder createStatefulTopology(final String storeName) {
+        return builder
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+            .addStateStore(
+                Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
+                "processor"
+            )
+            .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
     private TopologyBuilder createInternalRepartitioningTopology() {
-        return builder.addSource("source", INPUT_TOPIC_1)
+        return builder
+            .addSource("source", INPUT_TOPIC_1)
             .addInternalTopic(THROUGH_TOPIC_1)
             .addSink("sink0", THROUGH_TOPIC_1, "source")
             .addSource("source1", THROUGH_TOPIC_1)
@@ -405,12 +430,13 @@ public class ProcessorTopologyTest {
     }
 
     private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
-        return builder.addSource("source", INPUT_TOPIC_1)
-                .addInternalTopic(THROUGH_TOPIC_1)
-                .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
-                .addSink("sink0", THROUGH_TOPIC_1, "processor")
-                .addSource("source1", THROUGH_TOPIC_1)
-                .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+        return builder
+            .addSource("source", INPUT_TOPIC_1)
+            .addInternalTopic(THROUGH_TOPIC_1)
+            .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
+            .addSink("sink0", THROUGH_TOPIC_1, "processor")
+            .addSource("source1", THROUGH_TOPIC_1)
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
     }
 
     private TopologyBuilder createForwardToSourceTopology() {
@@ -434,26 +460,34 @@ public class ProcessorTopologyTest {
      * A processor that simply forwards all messages to all children.
      */
     protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
-
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             context().forward(key, value);
         }
 
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             context().forward(Long.toString(streamTime), "punctuate");
         }
     }
 
     /**
+     * A processor that simply forwards all messages to all children with advanced timestamps.
+     */
+    protected static class TimestampProcessor extends AbstractProcessor<String, String> {
+        @Override
+        public void process(final String key, final String value) {
+            context().forward(key, value, To.all().withTimestamp(context().timestamp() + 10));
+        }
+    }
+
+    /**
      * A processor that removes custom timestamp information from messages and forwards modified messages to each child.
      * A message contains custom timestamp information if the value is in ".*@[0-9]+" format.
      */
     protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> {
-
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             context().forward(key, value.split("@")[0]);
         }
     }
@@ -462,22 +496,23 @@ public class ProcessorTopologyTest {
      * A processor that forwards slightly-modified messages to each child.
      */
     protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
-
         private final int numChildren;
 
-        public MultiplexingProcessor(int numChildren) {
+        MultiplexingProcessor(final int numChildren) {
             this.numChildren = numChildren;
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(key, value + "(" + (i + 1) + ")", i);
             }
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
             }
@@ -489,22 +524,23 @@ public class ProcessorTopologyTest {
      * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
      */
     protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
-
         private final int numChildren;
 
-        public MultiplexByNameProcessor(int numChildren) {
+        MultiplexByNameProcessor(final int numChildren) {
             this.numChildren = numChildren;
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             for (int i = 0; i != numChildren; ++i) {
-                context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+                context().forward(key, value + "(" + (i + 1) + ")",  "sink" + i);
             }
         }
 
+        @SuppressWarnings("deprecation")
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             for (int i = 0; i != numChildren; ++i) {
                 context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
             }
@@ -516,28 +552,27 @@ public class ProcessorTopologyTest {
      * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
      */
     protected static class StatefulProcessor extends AbstractProcessor<String, String> {
-
         private KeyValueStore<String, String> store;
         private final String storeName;
 
-        public StatefulProcessor(String storeName) {
+        StatefulProcessor(final String storeName) {
             this.storeName = storeName;
         }
 
         @Override
         @SuppressWarnings("unchecked")
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<String, String>) context.getStateStore(storeName);
         }
 
         @Override
-        public void process(String key, String value) {
+        public void process(final String key, final String value) {
             store.put(key, value);
         }
 
         @Override
-        public void punctuate(long streamTime) {
+        public void punctuate(final long streamTime) {
             int count = 0;
             try (KeyValueIterator<String, String> iter = store.all()) {
                 while (iter.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
index 7932d1f..0af5e17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
@@ -19,14 +19,18 @@ package org.apache.kafka.streams.processor.internals;
 public class RecordContextStub implements RecordContext {
 
     private final long offset;
-    private final long timestamp;
+    private long timestamp;
     private final int partition;
     private final String topic;
 
     public RecordContextStub() {
         this(-1, -1, -1, "");
     }
-    public RecordContextStub(final long offset, final long timestamp, final int partition, final String topic) {
+
+    public RecordContextStub(final long offset,
+                             final long timestamp,
+                             final int partition,
+                             final String topic) {
         this.offset = offset;
         this.timestamp = timestamp;
         this.partition = partition;
@@ -44,6 +48,11 @@ public class RecordContextStub implements RecordContext {
     }
 
     @Override
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
     public String topic() {
         return topic;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 06137fb..6b0cb66 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -28,12 +28,14 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -53,6 +55,7 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
     private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
+    private final ToInternal toInternal = new ToInternal();
 
     private Serde<?> keySerde;
     private Serde<?> valSerde;
@@ -179,44 +182,39 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode thisNode = currentNode;
-        for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currentNode = childNode;
-            try {
-                childNode.process(key, value);
-            } finally {
-                currentNode = thisNode;
-            }
-        }
+        forward(key, value, To.all());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        final ProcessorNode thisNode = currentNode;
-        final ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
-        currentNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currentNode = thisNode;
-        }
+        forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K, V> void forward(final K key, final V value, final String childName) {
+        forward(key, value, To.child(childName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        toInternal.update(to);
+        if (toInternal.hasTimestamp()) {
+            setTime(toInternal.timestamp());
+        }
         final ProcessorNode thisNode = currentNode;
-        for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            if (childNode.name().equals(childName)) {
-                currentNode = childNode;
-                try {
+        try {
+            for (final ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+                if (toInternal.child() == null || toInternal.child().equals(childNode.name())) {
+                    currentNode = childNode;
                     childNode.process(key, value);
-                } finally {
-                    currentNode = thisNode;
+                    toInternal.update(to); // need to reset because MockProcessorContext is shared over multiple Processors and toInternal might have been modified
                 }
-                break;
             }
+        } finally {
+            currentNode = thisNode;
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index afa0639..6b5d47a 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 
@@ -65,6 +66,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        forwardedValues.put(key, value);
+    }
+
+    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         forward(key, value);
     }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message