kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4936: Add dynamic routing in Streams (#5018)
Date Wed, 30 May 2018 18:55:21 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f33e9a3  KAFKA-4936: Add dynamic routing in Streams (#5018)
f33e9a3 is described below

commit f33e9a346e22e29bb66e0ea0f3442903d136ca67
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed May 30 11:54:53 2018 -0700

    KAFKA-4936: Add dynamic routing in Streams (#5018)
    
    implements KIP-303
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          |   4 +-
 docs/streams/developer-guide/processor-api.html    |   3 +-
 docs/streams/upgrade-guide.html                    |   7 ++
 .../java/org/apache/kafka/streams/Topology.java    | 140 ++++++++++++++++++++-
 .../apache/kafka/streams/TopologyDescription.java  |   1 +
 .../org/apache/kafka/streams/kstream/KStream.java  |  22 +++-
 .../streams/kstream/internals/KStreamImpl.java     |  24 +++-
 .../internals/WindowedStreamPartitioner.java       |   8 +-
 .../kafka/streams/processor/ProcessorContext.java  |   2 +-
 .../processor/{internals => }/RecordContext.java   |  23 ++--
 .../kafka/streams/processor/StreamPartitioner.java |   5 +-
 .../streams/processor/TopicNameExtractor.java      |  37 ++++++
 .../internals/AbstractProcessorContext.java        |   6 +-
 .../internals/DefaultStreamPartitioner.java        |  10 +-
 .../internals/InternalProcessorContext.java        |   7 +-
 .../internals/InternalTopologyBuilder.java         |  77 ++++++++----
 .../internals/ProcessorRecordContext.java          |  28 +++--
 .../processor/internals/RecordCollectorImpl.java   |   6 +-
 .../streams/processor/internals/SinkNode.java      |  22 ++--
 .../processor/internals/StandbyContextImpl.java    |   4 +-
 ...dContext.java => StaticTopicNameExtractor.java} |  46 +++----
 .../processor/internals/StreamsMetadataState.java  |   6 +-
 .../AbstractMergedSortedCacheStoreIterator.java    |   2 +-
 .../state/internals/CachingKeyValueStore.java      |   6 +-
 .../state/internals/CachingSessionStore.java       |   4 +-
 .../state/internals/CachingWindowStore.java        |   6 +-
 .../streams/state/internals/LRUCacheEntry.java     |  74 +++++------
 ...ergedSortedCacheKeyValueBytesStoreIterator.java |   2 +-
 .../MergedSortedCacheSessionStoreIterator.java     |   2 +-
 .../MergedSortedCacheWindowStoreIterator.java      |   2 +-
 ...rgedSortedCacheWindowStoreKeyValueIterator.java |   2 +-
 .../kafka/streams/state/internals/NamedCache.java  |   6 +-
 .../kafka/streams/state/internals/ThreadCache.java |   8 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   2 +-
 .../org/apache/kafka/streams/TopologyTest.java     |   8 +-
 .../streams/kstream/internals/KStreamImplTest.java |  34 ++++-
 .../internals/WindowedStreamPartitionerTest.java   |   4 +-
 .../internals/AbstractProcessorContextTest.java    |   6 +-
 .../internals/InternalTopologyBuilderTest.java     |  10 +-
 .../processor/internals/ProcessorTopologyTest.java |   2 +-
 .../processor/internals/RecordCollectorTest.java   |   2 +-
 .../processor/internals/RecordContextStub.java     |  81 ------------
 .../streams/processor/internals/SinkNodeTest.java  |   2 +-
 .../internals/StreamsMetadataStateTest.java        |   7 +-
 .../streams/state/internals/NamedCacheTest.java    |  24 ++--
 .../streams/state/internals/ThreadCacheTest.java   |  18 +--
 .../kafka/streams/scala/kstream/KStream.scala      |  34 ++++-
 47 files changed, 522 insertions(+), 314 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index e65f726..7c0b736 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3023,7 +3023,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
                         <li>KStream -&gt; void</li>
                     </ul>
                 </td>
-                    <td><p class="first"><strong>Terminal operation.</strong>  Write the records to a Kafka topic.
+                    <td><p class="first"><strong>Terminal operation.</strong>  Write the records to Kafka topic(s).
                         (<a class="reference external" href="../../../javadoc/org/apache/kafka/streams/kstream/KStream.html#to(java.lang.String)">KStream details</a>)</p>
                         <p>When to provide serdes explicitly:</p>
                         <ul class="simple">
@@ -3037,6 +3037,8 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
                         <p>A variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to specify how the data is produced by using a <code class="docutils literal"><span class="pre">Produced</span></code>
                             instance to specify, for example, a <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> that gives you control over
                             how output records are distributed across the partitions of the output topic.</p>
+                        <p>Another variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <code class="docutils literal"><span class="pre">TopicNameExtractor</span></code>
+                            instance.</p>
                         <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
 
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index ef89372..8236ded 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -384,7 +384,8 @@
                 <li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, using
                     <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li>
                 <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node
-                    as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic.</li>
+                    as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
+                    to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
             </ul>
             <p>In this topology, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
                 upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 0e3725e..45d1f7d 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -137,6 +137,13 @@
         Forwarding based on child index is not supported in the new API any longer.
     </p>
     <p>
+        We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that
+        take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to
+        based on the record's key and value, as well as record context. Note that all the Kafka topics that that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
+        <code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application
+        to use Kafka Streams 2.0.0.
+    </p>
+    <p>
         <a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
         Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.
     </p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index c137a30..22f6ea8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -411,7 +412,8 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -443,7 +445,8 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -471,7 +474,8 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -501,7 +505,8 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
@@ -517,6 +522,130 @@ public class Topology {
     }
 
     /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to which this sink should write for each record
+     * @param parentNames       the name of one or more source or processor nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> topicExtractor,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, null, null, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor},
+     * using the supplied partitioner.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
+     * the named Kafka topic's partitions.
+     * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state
+     * stores} in its processors.
+     * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+     * records among partitions using Kafka's default partitioning logic.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to which this sink should write for each record
+     * @param partitioner       the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames       the name of one or more source or processor nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> topicExtractor,
+                                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, null, null, partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the specified key and value serializers.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to which this sink should write for each record
+     * @param keySerializer     the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     *                          should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param valueSerializer   the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     *                          should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param parentNames       the name of one or more source or processor nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> topicExtractor,
+                                                final Serializer<K> keySerializer,
+                                                final Serializer<V> valueSerializer,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the specified key and value serializers, and the supplied partitioner.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to which this sink should write for each record
+     * @param keySerializer     the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     *                          should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param valueSerializer   the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     *                          should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param partitioner       the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames       the name of one or more source or processor nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> topicExtractor,
+                                                final Serializer<K> keySerializer,
+                                                final Serializer<V> valueSerializer,
+                                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, partitioner, parentNames);
+        return this;
+    }
+
+    /**
      * Add a new processor node that receives and processes records output by one or more parent source or processor
      * node.
      * Any new record output by this processor will be forwarded to its child processor or sink nodes.
@@ -526,7 +655,8 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
      * and process
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the parent's name
      */
     public synchronized Topology addProcessor(final String name,
                                               final ProcessorSupplier supplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 131e8d3..04a292f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -134,6 +134,7 @@ public interface TopologyDescription {
     interface Sink extends Node {
         /**
          * The topic name this sink node is writing to.
+         * Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor}
          * @return a topic name
          */
         String topic();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e75bb3a..da86a75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 
 /**
  * {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@@ -461,13 +462,32 @@ public interface KStream<K, V> {
      * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
      * started).
      *
-     * @param produced    the options to use when producing to the topic
      * @param topic       the topic name
+     * @param produced    the options to use when producing to the topic
      */
     void to(final String topic,
             final Produced<K, V> produced);
 
     /**
+     * Dynamically materialize this stream to topics using default serializers specified in the config and producer's
+     * {@link DefaultPartitioner}.
+     * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}.
+     *
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to write to for each record
+     */
+    void to(final TopicNameExtractor<K, V> topicExtractor);
+
+    /**
+     * Dynamically materialize this stream to topics using the provided {@link Produced} instance.
+     * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}.
+     *
+     * @param topicExtractor    the extractor to determine the name of the Kafka topic to write to for each record
+     * @param produced          the options to use when producing to the topic
+     */
+    void to(final TopicNameExtractor<K, V> topicExtractor,
+            final Produced<K, V> produced);
+
+    /**
      * Transform each record of the input stream into zero or more records in the output stream (both key and value type
      * can be altered arbitrarily).
      * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5331a95..7356aff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -41,6 +41,8 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -304,27 +306,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, Produced.<K, V>with(null, null, null));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(topic, new ProducedInternal<>(produced));
+        to(new StaticTopicNameExtractor<K, V>(topic), new ProducedInternal<>(produced));
+    }
 
+    @Override
+    public void to(final TopicNameExtractor<K, V> topicExtractor) {
+        to(topicExtractor, Produced.<K, V>with(null, null, null));
+    }
+
+    @Override
+    public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) {
+        Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
+        Objects.requireNonNull(produced, "Produced can't be null");
+        to(topicExtractor, new ProducedInternal<>(produced));
     }
 
     @SuppressWarnings("unchecked")
-    private void to(final String topic, final ProducedInternal<K, V> produced) {
+    private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
         final String name = builder.newProcessorName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
         final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
         final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
 
         if (partitioner == null && keySerializer instanceof WindowedSerializer) {
-            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, (WindowedSerializer) keySerializer);
-            builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name);
+            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
+            builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, windowedPartitioner, this.name);
         } else {
-            builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
+            builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, partitioner, this.name);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 7b04f6a..04a9ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -24,11 +24,9 @@ import static org.apache.kafka.common.utils.Utils.toPositive;
 
 public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
 
-    private final String topic;
     private final WindowedSerializer<K> serializer;
 
-    WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) {
-        this.topic = topic;
+    WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
         this.serializer = serializer;
     }
 
@@ -37,12 +35,14 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
      * and the current number of partitions. The partition number id determined by the original key of the windowed key
      * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
      *
+     * @param topic the topic name this record is sent to
      * @param windowedKey the key of the record
      * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
-    public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) {
+    @Override
+    public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
         final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
 
         // hash the keyBytes to choose a partition
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f2a9f64..d21667f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -199,7 +199,7 @@ public interface ProcessorContext {
     long offset();
 
     /**
-     * Returns the headers of the current input record
+     * Returns the headers of the current input record; could be null if it is not available
      * @return the headers
      */
     Headers headers();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
similarity index 65%
copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
index 15add71..5819a46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.processor.Processor;
 
 /**
  * The context associated with the current record being processed by
@@ -25,32 +24,32 @@ import org.apache.kafka.streams.processor.Processor;
  */
 public interface RecordContext {
     /**
-     * @return The offset of the original record received from Kafka
+     * @return  The offset of the original record received from Kafka;
+     *          could be -1 if it is not available
      */
     long offset();
 
     /**
-     * @return The timestamp extracted from the record received from Kafka
+     * @return  The timestamp extracted from the record received from Kafka;
+     *          could be -1 if it is not available
      */
     long timestamp();
 
     /**
-     * Sets a new timestamp for the output record.
-     */
-    void setTimestamp(final long timestamp);
-
-    /**
-     * @return The topic the record was received on
+     * @return  The topic the record was received on;
+     *          could be null if it is not available
      */
     String topic();
 
     /**
-     * @return The partition the record was received on
+     * @return  The partition the record was received on;
+     *          could be -1 if it is not available
      */
     int partition();
 
     /**
-     * @return The headers from the record received from Kafka
+     * @return  The headers from the record received from Kafka;
+     *          could be null if it is not available
      */
     Headers headers();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index 1fa5e3d..a435caf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -52,11 +52,12 @@ public interface StreamPartitioner<K, V> {
 
     /**
      * Determine the partition number for a record with the given key and value and the current number of partitions.
-     * 
+     *
+     * @param topic the topic name this record is sent to
      * @param key the key of the record
      * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
-    Integer partition(K key, V value, int numPartitions);
+    Integer partition(String topic, K key, V value, int numPartitions);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java
new file mode 100644
index 0000000..5d79751
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * An interface that allows to dynamically determine the name of the Kafka topic to send at the sink node of the topology.
+ */
+@InterfaceStability.Evolving
+public interface TopicNameExtractor<K, V> {
+
+    /**
+     * Extracts the topic name to send to. The topic name must already exist, since the Kafka Streams library will not
+     * try to automatically create the topic with the extracted name.
+     *
+     * @param key           the record key
+     * @param value         the record value
+     * @param recordContext current context metadata of the record
+     * @return              the topic name this record should be sent to
+     */
+    String extract(final K key, final V value, final RecordContext recordContext);
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 3338669..0c3fcf2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -42,7 +42,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     private final ThreadCache cache;
     private final Serde valueSerde;
     private boolean initialized;
-    protected RecordContext recordContext;
+    protected ProcessorRecordContext recordContext;
     protected ProcessorNode currentNode;
     final StateManager stateManager;
 
@@ -178,12 +178,12 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public void setRecordContext(final RecordContext recordContext) {
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
         this.recordContext = recordContext;
     }
 
     @Override
-    public RecordContext recordContext() {
+    public ProcessorRecordContext recordContext() {
         return recordContext;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index da48e8f..cb64c3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -23,20 +23,18 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
 
-    private final Serializer<K> keySerializer;
     private final Cluster cluster;
-    private final String topic;
+    private final Serializer<K> keySerializer;
     private final DefaultPartitioner defaultPartitioner;
 
-    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
-        this.keySerializer = keySerializer;
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
         this.cluster = cluster;
-        this.topic = topic;
+        this.keySerializer = keySerializer;
         this.defaultPartitioner = new DefaultPartitioner();
     }
 
     @Override
-    public Integer partition(final K key, final V value, final int numPartitions) {
+    public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
         return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 0ebaf60..9439aba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -34,12 +35,12 @@ public interface InternalProcessorContext extends ProcessorContext {
      * Returns the current {@link RecordContext}
      * @return the current {@link RecordContext}
      */
-    RecordContext recordContext();
+    ProcessorRecordContext recordContext();
 
     /**
-     * @param recordContext the {@link RecordContext} for the record about to be processes
+     * @param recordContext the {@link ProcessorRecordContext} for the record about to be processes
      */
-    void setRecordContext(RecordContext recordContext);
+    void setRecordContext(ProcessorRecordContext recordContext);
 
     /**
      * @param currentNode the current {@link ProcessorNode}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 1651bbd..7d09031 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -319,19 +320,19 @@ public class InternalTopologyBuilder {
     }
 
     private class SinkNodeFactory<K, V> extends NodeFactory {
-        private final String topic;
         private final Serializer<K> keySerializer;
         private final Serializer<V> valSerializer;
         private final StreamPartitioner<? super K, ? super V> partitioner;
+        private final TopicNameExtractor<K, V> topicExtractor;
 
         private SinkNodeFactory(final String name,
                                 final String[] predecessors,
-                                final String topic,
+                                final TopicNameExtractor<K, V> topicExtractor,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valSerializer,
                                 final StreamPartitioner<? super K, ? super V> partitioner) {
             super(name, predecessors.clone());
-            this.topic = topic;
+            this.topicExtractor = topicExtractor;
             this.keySerializer = keySerializer;
             this.valSerializer = valSerializer;
             this.partitioner = partitioner;
@@ -339,17 +340,22 @@ public class InternalTopologyBuilder {
 
         @Override
         public ProcessorNode build() {
-            if (internalTopicNames.contains(topic)) {
-                // prefix the internal topic name with the application id
-                return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
+            if (topicExtractor instanceof StaticTopicNameExtractor) {
+                final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName;
+                if (internalTopicNames.contains(topic)) {
+                    // prefix the internal topic name with the application id
+                    return new SinkNode<>(name, new StaticTopicNameExtractor<K, V>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
+                } else {
+                    return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
+                }
             } else {
-                return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
+                return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
             }
         }
 
         @Override
         Sink describe() {
-            return new Sink(name, topic);
+            return new Sink(name, topicExtractor);
         }
     }
 
@@ -432,6 +438,18 @@ public class InternalTopologyBuilder {
                                      final String... predecessorNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
+        addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
+        nodeToSinkTopic.put(name, topic);
+    }
+
+    public final <K, V> void addSink(final String name,
+                                     final TopicNameExtractor<K, V> topicExtractor,
+                                     final Serializer<K> keySerializer,
+                                     final Serializer<V> valSerializer,
+                                     final StreamPartitioner<? super K, ? super V> partitioner,
+                                     final String... predecessorNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
         if (nodeFactories.containsKey(name)) {
             throw new TopologyException("Processor " + name + " is already added.");
         }
@@ -449,8 +467,7 @@ public class InternalTopologyBuilder {
             }
         }
 
-        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner));
-        nodeToSinkTopic.put(name, topic);
+        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
     }
@@ -888,13 +905,18 @@ public class InternalTopologyBuilder {
 
         for (final String predecessor : sinkNodeFactory.predecessors) {
             processorMap.get(predecessor).addChild(node);
-            if (internalTopicNames.contains(sinkNodeFactory.topic)) {
-                // prefix the internal topic name with the application id
-                final String decoratedTopic = decorateTopic(sinkNodeFactory.topic);
-                topicSinkMap.put(decoratedTopic, node);
-                repartitionTopics.add(decoratedTopic);
-            } else {
-                topicSinkMap.put(sinkNodeFactory.topic, node);
+            if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) {
+                final String topic = ((StaticTopicNameExtractor) sinkNodeFactory.topicExtractor).topicName;
+
+                if (internalTopicNames.contains(topic)) {
+                    // prefix the internal topic name with the application id
+                    final String decoratedTopic = decorateTopic(topic);
+                    topicSinkMap.put(decoratedTopic, node);
+                    repartitionTopics.add(decoratedTopic);
+                } else {
+                    topicSinkMap.put(topic, node);
+                }
+
             }
         }
     }
@@ -1489,17 +1511,26 @@ public class InternalTopologyBuilder {
     }
 
     public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
-        private final String topic;
+        private final TopicNameExtractor topicNameExtractor;
+
+        public Sink(final String name,
+                    final TopicNameExtractor topicNameExtractor) {
+            super(name);
+            this.topicNameExtractor = topicNameExtractor;
+        }
 
         public Sink(final String name,
                     final String topic) {
             super(name);
-            this.topic = topic;
+            this.topicNameExtractor = new StaticTopicNameExtractor(topic);
         }
 
         @Override
         public String topic() {
-            return topic;
+            if (topicNameExtractor instanceof StaticTopicNameExtractor)
+                return ((StaticTopicNameExtractor) topicNameExtractor).topicName;
+            else
+                return null;
         }
 
         @Override
@@ -1509,7 +1540,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Sink: " + name + " (topic: " + topic + ")\n      <-- " + nodeNames(predecessors);
+            return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + nodeNames(predecessors);
         }
 
         @Override
@@ -1523,14 +1554,14 @@ public class InternalTopologyBuilder {
 
             final Sink sink = (Sink) o;
             return name.equals(sink.name)
-                && topic.equals(sink.topic)
+                && topicNameExtractor.equals(sink.topicNameExtractor)
                 && predecessors.equals(sink.predecessors);
         }
 
         @Override
         public int hashCode() {
             // omit predecessors as it might change and alter the hash code
-            return Objects.hash(name, topic);
+            return Objects.hash(name, topicNameExtractor);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index c071525..dd57264 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -17,16 +17,17 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.processor.RecordContext;
 
 import java.util.Objects;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    private long timestamp;
-    private final long offset;
-    private final String topic;
-    private final int partition;
-    private final Headers headers;
+    long timestamp;
+    final long offset;
+    final String topic;
+    final int partition;
+    final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
@@ -41,18 +42,27 @@ public class ProcessorRecordContext implements RecordContext {
         this.headers = headers;
     }
 
+    public ProcessorRecordContext(final long timestamp,
+                                  final long offset,
+                                  final int partition,
+                                  final String topic) {
+        this(timestamp, offset, partition, topic, null);
+    }
+
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
     public long offset() {
         return offset;
     }
 
+    @Override
     public long timestamp() {
         return timestamp;
     }
 
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
     @Override
     public String topic() {
         return topic;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 1c8b0a0..d753648 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -49,11 +49,11 @@ import java.util.Map;
 
 public class RecordCollectorImpl implements RecordCollector {
     private final Logger log;
+    private final String logPrefix;
+    private final Sensor skippedRecordsSensor;
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
-    private final String logPrefix;
     private final ProductionExceptionHandler productionExceptionHandler;
-    private final Sensor skippedRecordsSensor;
 
     private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
         "No more records will be sent and no more offsets will be recorded for this task.";
@@ -88,7 +88,7 @@ public class RecordCollectorImpl implements RecordCollector {
         if (partitioner != null) {
             final List<PartitionInfo> partitions = producer.partitionsFor(topic);
             if (partitions.size() > 0) {
-                partition = partitioner.partition(key, value, partitions.size());
+                partition = partitioner.partition(topic, key, value, partitions.size());
             } else {
                 throw new StreamsException("Could not get partition information for topic '" + topic + "'." +
                     " This can happen if the topic does not exist.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7711905..73bffc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,26 +19,26 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
-    private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
+    private final TopicNameExtractor<K, V> topicExtractor;
     private final StreamPartitioner<? super K, ? super V> partitioner;
 
-    private ProcessorContext context;
+    private InternalProcessorContext context;
 
-    public SinkNode(final String name,
-                    final String topic,
-                    final Serializer<K> keySerializer,
-                    final Serializer<V> valSerializer,
-                    final StreamPartitioner<? super K, ? super V> partitioner) {
+    SinkNode(final String name,
+             final TopicNameExtractor<K, V> topicExtractor,
+             final Serializer<K> keySerializer,
+             final Serializer<V> valSerializer,
+             final StreamPartitioner<? super K, ? super V> partitioner) {
         super(name);
 
-        this.topic = topic;
+        this.topicExtractor = topicExtractor;
         this.keySerializer = keySerializer;
         this.valSerializer = valSerializer;
         this.partitioner = partitioner;
@@ -83,6 +83,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
             throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
         }
 
+        final String topic = topicExtractor.extract(key, value, this.context.recordContext());
+
         try {
             collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
         } catch (final ClassCastException e) {
@@ -115,7 +117,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public String toString(final String indent) {
         final StringBuilder sb = new StringBuilder(super.toString(indent));
         sb.append(indent).append("\ttopic:\t\t");
-        sb.append(topic);
+        sb.append(topicExtractor);
         sb.append("\n");
         return sb.toString();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 14f986c..5c278c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -191,7 +191,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public RecordContext recordContext() {
+    public ProcessorRecordContext recordContext() {
         throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
     }
 
@@ -199,7 +199,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public void setRecordContext(final RecordContext recordContext) {
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
         throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
similarity index 50%
rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
index 15add71..c525112 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
@@ -16,42 +16,26 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 
 /**
- * The context associated with the current record being processed by
- * an {@link Processor}
+ * Static topic name extractor
  */
-public interface RecordContext {
-    /**
-     * @return The offset of the original record received from Kafka
-     */
-    long offset();
+public class StaticTopicNameExtractor<K, V> implements TopicNameExtractor<K, V> {
 
-    /**
-     * @return The timestamp extracted from the record received from Kafka
-     */
-    long timestamp();
+    public final String topicName;
 
-    /**
-     * Sets a new timestamp for the output record.
-     */
-    void setTimestamp(final long timestamp);
+    public StaticTopicNameExtractor(final String topicName) {
+        this.topicName = topicName;
+    }
 
-    /**
-     * @return The topic the record was received on
-     */
-    String topic();
-
-    /**
-     * @return The partition the record was received on
-     */
-    int partition();
-
-    /**
-     * @return The headers from the record received from Kafka
-     */
-    Headers headers();
+    public String extract(final K key, final V value, final RecordContext recordContext) {
+        return topicName;
+    }
 
+    @Override
+    public String toString() {
+        return "StaticTopicNameExtractor(" + topicName + ")";
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 7fb0352..ea306fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -154,9 +154,7 @@ public class StreamsMetadataState {
 
         return getStreamsMetadataForKey(storeName,
                                         key,
-                                        new DefaultStreamPartitioner<>(keySerializer,
-                                                                       clusterMetadata,
-                                                                       sourceTopicsInfo.topicWithMostPartitions),
+                                        new DefaultStreamPartitioner<>(keySerializer, clusterMetadata),
                                         sourceTopicsInfo);
     }
 
@@ -254,7 +252,7 @@ public class StreamsMetadataState {
                                                          final StreamPartitioner<? super K, ?> partitioner,
                                                          final SourceTopicsInfo sourceTopicsInfo) {
 
-        final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
+        final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
             matchingPartitions.add(new TopicPartition(sourceTopic, partition));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 6eb9a0a..1a3b075 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -49,7 +49,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements K
     abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
 
     private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
-        return nextFromCache.value.value == null;
+        return nextFromCache.value.value() == null;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 285bde5..da308a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -23,8 +23,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -87,7 +87,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
-        final RecordContext current = context.recordContext();
+        final ProcessorRecordContext current = context.recordContext();
         try {
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
@@ -179,7 +179,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
             }
             return rawValue;
         } else {
-            return entry.value;
+            return entry.value();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index c099faf..6950693 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -168,7 +168,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
-        final RecordContext current = context.recordContext();
+        final ProcessorRecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
             final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ca24ffd..1f08f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -24,8 +24,8 @@ import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -107,7 +107,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
                               final Windowed<K> windowedKey,
                               final InternalProcessorContext context) {
         if (flushListener != null) {
-            final RecordContext current = context.recordContext();
+            final ProcessorRecordContext current = context.recordContext();
             context.setRecordContext(entry.recordContext());
             try {
                 final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
@@ -174,7 +174,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         if (entry == null) {
             return underlying.fetch(key, timestamp);
         } else {
-            return entry.value;
+            return entry.value();
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 78c0331..0ac0b77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -17,21 +17,18 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * A cache entry
  */
-class LRUCacheEntry implements RecordContext {
-
-    public final byte[] value;
-    private final Headers headers;
-    private final long offset;
-    private final String topic;
-    private final int partition;
-    private long timestamp;
+class LRUCacheEntry extends ProcessorRecordContext {
 
-    private long sizeBytes;
+    private final byte[] value;
+    private final long sizeBytes;
     private boolean isDirty;
 
     LRUCacheEntry(final byte[] value) {
@@ -45,13 +42,9 @@ class LRUCacheEntry implements RecordContext {
                   final long timestamp,
                   final int partition,
                   final String topic) {
+        super(timestamp, offset, partition, topic, headers);
         this.value = value;
-        this.headers = headers;
-        this.partition = partition;
-        this.topic = topic;
-        this.offset = offset;
         this.isDirty = isDirty;
-        this.timestamp = timestamp;
         this.sizeBytes = (value == null ? 0 : value.length) +
                 1 + // isDirty
                 8 + // timestamp
@@ -60,47 +53,38 @@ class LRUCacheEntry implements RecordContext {
                 (topic == null ? 0 : topic.length());
     }
 
-    @Override
-    public long offset() {
-        return offset;
+    void markClean() {
+        isDirty = false;
     }
 
-    @Override
-    public long timestamp() {
-        return timestamp;
+    boolean isDirty() {
+        return isDirty;
     }
 
-    @Override
-    public void setTimestamp(final long timestamp) {
-        throw new UnsupportedOperationException();
+    long size() {
+        return sizeBytes;
     }
 
-    @Override
-    public String topic() {
-        return topic;
+    byte[] value() {
+        return value;
     }
 
     @Override
-    public int partition() {
-        return partition;
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final LRUCacheEntry that = (LRUCacheEntry) o;
+        return timestamp() == that.timestamp() &&
+                offset() == that.offset() &&
+                partition() == that.partition() &&
+                Objects.equals(topic(), that.topic()) &&
+                Objects.equals(headers(), that.headers()) &&
+                Arrays.equals(this.value, that.value()) &&
+                this.isDirty == that.isDirty();
     }
 
     @Override
-    public Headers headers() {
-        return headers;
-    }
-
-    void markClean() {
-        isDirty = false;
-    }
-
-    boolean isDirty() {
-        return isDirty;
-    }
-
-    public long size() {
-        return sizeBytes;
+    public int hashCode() {
+        return Objects.hash(timestamp(), offset(), topic(), partition(), headers(), value, isDirty);
     }
-
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
index 8f9d152..7a545e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
@@ -44,7 +44,7 @@ class MergedSortedCacheKeyValueBytesStoreIterator extends AbstractMergedSortedCa
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index 01b577c..dd2b193 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -53,7 +53,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
index b08cf85..d5bb421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -48,7 +48,7 @@ class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStor
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index ef0a44e..a48c81a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -61,7 +61,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 6a3d197..77b9c1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -121,7 +121,7 @@ class NamedCache {
         // evicted already been removed from the cache so add it to the list of
         // flushed entries and remove from dirtyKeys.
         if (evicted != null) {
-            entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value, evicted.entry));
+            entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value(), evicted.entry));
             dirtyKeys.remove(evicted.key);
         }
 
@@ -130,9 +130,9 @@ class NamedCache {
             if (node == null) {
                 throw new IllegalStateException("Key = " + key + " found in dirty key set, but entry is null");
             }
-            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry));
+            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), node.entry));
             node.entry.markClean();
-            if (node.entry.value == null) {
+            if (node.entry.value() == null) {
                 deleted.add(node.key);
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 8c3716b..7ce03a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
@@ -332,9 +332,9 @@ public class ThreadCache {
     static class DirtyEntry {
         private final Bytes key;
         private final byte[] newValue;
-        private final RecordContext recordContext;
+        private final ProcessorRecordContext recordContext;
 
-        DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext recordContext) {
+        DirtyEntry(final Bytes key, final byte[] newValue, final ProcessorRecordContext recordContext) {
             this.key = key;
             this.newValue = newValue;
             this.recordContext = recordContext;
@@ -348,7 +348,7 @@ public class ThreadCache {
             return newValue;
         }
 
-        public RecordContext recordContext() {
+        public ProcessorRecordContext recordContext() {
             return recordContext;
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 904ebe2..297b243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -406,7 +406,7 @@ public class KafkaStreamsTest {
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
-            public Integer partition(final String key, final Object value, final int numPartitions) {
+            public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
                 return 0;
             }
         });
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0c34723..a845be3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@@ -95,7 +96,12 @@ public class TopologyTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicWhenAddingSink() {
-        topology.addSink("name", null);
+        topology.addSink("name", (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
+        topology.addSink("name", (TopicNameExtractor) null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 6da148a..7aed8e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -38,10 +38,12 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -50,6 +52,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -221,6 +224,26 @@ public class KStreamImplTest {
     }
 
     @Test
+    public void shouldSendDataToDynamicTopics() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String input = "topic";
+        final KStream<String, String> stream = builder.stream(input, stringConsumed);
+        stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1),
+                  Produced.with(Serdes.String(), Serdes.String()));
+        builder.stream(input + "-a-v", stringConsumed).process(processorSupplier);
+        builder.stream(input + "-b-v", stringConsumed).process(processorSupplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "a", "v1"));
+            driver.pipeInput(recordFactory.create(input, "a", "v2"));
+            driver.pipeInput(recordFactory.create(input, "b", "v1"));
+        }
+        List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
+        assertThat(mockProcessors.get(0).processed, equalTo(Utils.mkList("a:v1", "a:v2")));
+        assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList("b:v1")));
+    }
+
+    @Test
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> kStream = builder.stream("topic-1", stringConsumed);
@@ -300,12 +323,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMapValues() {
-        testStream.flatMapValues((ValueMapper) null);
+        testStream.flatMapValues((ValueMapper<? super String, ? extends Iterable<? extends String>>) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
-        testStream.flatMapValues((ValueMapperWithKey) null);
+        testStream.flatMapValues((ValueMapperWithKey<? super String, ? super String, ? extends Iterable<? extends String>>) null);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -325,7 +348,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicOnTo() {
-        testStream.to(null);
+        testStream.to((String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserOnTo() {
+        testStream.to((TopicNameExtractor<String, String>) null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 3aafa33..c41ae15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -57,7 +57,7 @@ public class WindowedStreamPartitionerTest {
         final Random rand = new Random();
         final DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
         final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer);
-        final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, timeWindowedSerializer);
+        final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();
@@ -72,7 +72,7 @@ public class WindowedStreamPartitionerTest {
                 TimeWindow window = new TimeWindow(10 * w, 20 * w);
 
                 Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+                Integer actual = streamPartitioner.partition(topicName, windowedKey, value, infos.size());
 
                 assertEquals(expected, actual);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 9aaa8a6..d3f8dda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -49,7 +49,7 @@ public class AbstractProcessorContextTest {
     private final AbstractProcessorContext context = new TestProcessorContext(metrics);
     private final MockStateStore stateStore = new MockStateStore("store", false);
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
-    private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers);
+    private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
 
     @Before
     public void before() {
@@ -95,7 +95,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
         assertThat(context.topic(), nullValue());
     }
 
@@ -153,7 +153,7 @@ public class AbstractProcessorContextTest {
 
     @Test
     public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
         assertThat(context.headers(), nullValue());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index d8e66b8..1da0425 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -411,7 +412,12 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicWhenAddingSink() {
-        builder.addSink("name", null, null, null, null);
+        builder.addSink("name", (String) null, null, null, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
+        builder.addSink("name", (TopicNameExtractor) null, null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -456,7 +462,7 @@ public class InternalTopologyBuilderTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() {
-        builder.addStateStore((StoreBuilder) null);
+        builder.addStateStore(null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 033c0e1..d88d3b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -420,7 +420,7 @@ public class ProcessorTopologyTest {
     private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
         return new StreamPartitioner<Object, Object>() {
             @Override
-            public Integer partition(final Object key, final Object value, final int numPartitions) {
+            public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) {
                 return partition;
             }
         };
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index e439372..6954eda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -73,7 +73,7 @@ public class RecordCollectorTest {
 
     private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
         @Override
-        public Integer partition(final String key, final Object value, final int numPartitions) {
+        public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
             return Integer.parseInt(key) % numPartitions;
         }
     };
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
deleted file mode 100644
index 7afd51e..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-
-public class RecordContextStub implements RecordContext {
-
-    private final long offset;
-    private long timestamp;
-    private final int partition;
-    private final String topic;
-    private final Headers headers;
-
-    public RecordContextStub() {
-        this(-1, -1, -1, "", null);
-    }
-
-    public RecordContextStub(final long offset,
-                             final long timestamp,
-                             final int partition,
-                             final String topic,
-                             final Headers headers) {
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.partition = partition;
-        this.topic = topic;
-        this.headers = headers;
-    }
-
-    public RecordContextStub(final long offset,
-                             final long timestamp,
-                             final int partition,
-                             final String topic) {
-        this(offset, timestamp, partition, topic, null);
-    }
-
-    @Override
-    public long offset() {
-        return offset;
-    }
-
-    @Override
-    public long timestamp() {
-        return timestamp;
-    }
-
-    @Override
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public String topic() {
-        return topic;
-    }
-
-    @Override
-    public int partition() {
-        return partition;
-    }
-
-    @Override
-    public Headers headers() {
-        return headers;
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 753d26b..dacc17e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -47,7 +47,7 @@ public class SinkNodeTest {
             new Metrics().sensor("skipped-records")
         )
     );
-    private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
+    private final SinkNode sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor("any-output-topic"), anySerializer, anySerializer, null);
 
     @Before
     public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 5ab2f17..fce5342 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -65,7 +65,6 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic1P1;
     private TopicPartition topic2P1;
     private TopicPartition topic4P0;
-    private List<PartitionInfo> partitionInfos;
     private Cluster cluster;
     private final String globalTable = "global-table";
     private StreamPartitioner<String, Object> partitioner;
@@ -113,7 +112,7 @@ public class StreamsMetadataStateTest {
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
         hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
 
-        partitionInfos = Arrays.asList(
+        final List<PartitionInfo> partitionInfos = Arrays.asList(
                 new PartitionInfo("topic-one", 0, null, null, null),
                 new PartitionInfo("topic-one", 1, null, null, null),
                 new PartitionInfo("topic-two", 0, null, null, null),
@@ -126,7 +125,7 @@ public class StreamsMetadataStateTest {
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
-            public Integer partition(final String key, final Object value, final int numPartitions) {
+            public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
                 return 1;
             }
         };
@@ -246,7 +245,7 @@ public class StreamsMetadataStateTest {
 
         final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
             @Override
-            public Integer partition(final String key, final Object value, final int numPartitions) {
+            public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
                 return 2;
             }
         });
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 92653ce..3f78be6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -72,8 +72,8 @@ public class NamedCacheTest {
             cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, ""));
             LRUCacheEntry head = cache.first();
             LRUCacheEntry tail = cache.last();
-            assertEquals(new String(head.value), toInsert.get(i).value);
-            assertEquals(new String(tail.value), toInsert.get(0).value);
+            assertEquals(new String(head.value()), toInsert.get(i).value);
+            assertEquals(new String(tail.value()), toInsert.get(0).value);
             assertEquals(cache.flushes(), 0);
             assertEquals(cache.hits(), 0);
             assertEquals(cache.misses(), 0);
@@ -116,9 +116,9 @@ public class NamedCacheTest {
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{11}));
         cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{12}));
 
-        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value);
-        assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value);
-        assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value);
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value());
+        assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value());
+        assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value());
         assertEquals(cache.hits(), 3);
     }
 
@@ -128,15 +128,15 @@ public class NamedCacheTest {
         cache.putIfAbsent(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{20}));
         cache.putIfAbsent(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{30}));
 
-        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value);
-        assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value);
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value());
+        assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value());
     }
 
     @Test
     public void shouldDeleteAndUpdateSize() {
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}));
         final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
-        assertArrayEquals(new byte[] {10}, deleted.value);
+        assertArrayEquals(new byte[] {10}, deleted.value());
         assertEquals(0, cache.sizeInBytes());
     }
 
@@ -146,9 +146,9 @@ public class NamedCacheTest {
                                    KeyValue.pair(new byte[] {1}, new LRUCacheEntry(new byte[]{1})),
                                    KeyValue.pair(new byte[] {2}, new LRUCacheEntry(new byte[]{2}))));
 
-        assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value);
-        assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value);
-        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value);
+        assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value());
+        assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value());
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value());
     }
 
     @Test
@@ -157,7 +157,7 @@ public class NamedCacheTest {
             KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{1})),
             KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{2}))));
 
-        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value);
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value());
         assertEquals(cache.overwrites(), 2);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index d100ae5..3fa9367 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -66,7 +66,7 @@ public class ThreadCacheTest {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             LRUCacheEntry entry = cache.get(namespace, key);
             assertEquals(entry.isDirty(), true);
-            assertEquals(new String(entry.value), kvToInsert.value);
+            assertEquals(new String(entry.value()), kvToInsert.value);
         }
         assertEquals(cache.gets(), 5);
         assertEquals(cache.puts(), 5);
@@ -188,7 +188,7 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
 
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value);
+        assertEquals(key.get(), cache.delete(namespace, key).value());
         assertNull(cache.get(namespace, key));
     }
 
@@ -204,7 +204,7 @@ public class ThreadCacheTest {
             }
         });
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value);
+        assertEquals(key.get(), cache.delete(namespace, key).value());
 
         // flushing should have no further effect
         cache.flush(namespace);
@@ -235,8 +235,8 @@ public class ThreadCacheTest {
         cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
         cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get()));
 
-        assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value);
-        assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value);
+        assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value());
+        assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value());
     }
 
     @Test
@@ -413,8 +413,8 @@ public class ThreadCacheTest {
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
                                            KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
 
-        assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value);
-        assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value);
+        assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value());
+        assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());
     }
 
     @Test
@@ -436,8 +436,8 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
         assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
-        assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value);
-        assertArrayEquals(value, cache.get(namespace, key).value);
+        assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value());
+        assertArrayEquals(value, cache.get(namespace, key).value());
     }
 
     @Test
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 49d9fe4..0f1fc82 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -250,6 +250,38 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.to(topic, produced)
 
   /**
+    * Dynamically materialize this stream to topics using the `Produced` instance for
+    * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
+    * The topic names for each record to send to is dynamically determined based on the given mapper.
+    * <p>
+    * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
+    * key and value serdes that will be converted to a `Produced` instance implicitly.
+    * <p>
+    * {{{
+    * Example:
+    *
+    * // brings implicit serdes in scope
+    * import Serdes._
+    *
+    * //..
+    * val clicksPerRegion: KTable[String, Long] = //..
+    *
+    * // Implicit serdes in scope will generate an implicit Produced instance, which
+    * // will be passed automatically to the call of through below
+    * clicksPerRegion.to(topicChooser)
+    *
+    * // Similarly you can create an implicit Produced and it will be passed implicitly
+    * // to the through call
+    * }}}
+    *
+    * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
+    * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
+    * @see `org.apache.kafka.streams.kstream.KStream#to`
+    */
+  def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit =
+    inner.to(extractor, produced)
+
+  /**
    * Transform each record of the input stream into zero or more records in the output stream (both key and value type
    * can be altered arbitrarily).
    * A `Transformer` is applied to each input record and computes zero or more output records. In order to assign a

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message