kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6761: Construct Physical Plan using Graph, Reduce streams footprint part III (#5201)
Date Wed, 01 Aug 2018 22:01:23 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c19213a  KAFKA-6761: Construct Physical Plan using Graph, Reduce streams footprint part III (#5201)
c19213a is described below

commit c19213ab4139aa1f56f89982448184a5c82f98a4
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed Aug 1 18:01:18 2018 -0400

    KAFKA-6761: Construct Physical Plan using Graph, Reduce streams footprint part III (#5201)
    
    The specific changes in this PR from the second PR include:
    
    1. Changed the types of graph nodes to names conveying more context
    2. Build the entire physical plan from the graph, after StreamsBuilder.build() is called.
    
    Other changes are addressed directly as review comments on the PR.
    
    Testing consists of using all existing streams tests to validate building the physical plan with graph
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |   3 +-
 .../streams/kstream/internals/AbstractStream.java  |  18 +-
 .../internals/GroupedStreamAggregateBuilder.java   |  41 +-
 .../kstream/internals/InternalStreamsBuilder.java  | 157 +++--
 .../kstream/internals/KGroupedStreamImpl.java      |  90 +--
 .../kstream/internals/KGroupedTableImpl.java       |  80 +--
 .../streams/kstream/internals/KStreamImpl.java     | 664 ++++++++++-----------
 .../internals/KStreamSessionWindowAggregate.java   |  50 +-
 .../kstream/internals/KStreamWindowAggregate.java  |  36 +-
 .../streams/kstream/internals/KTableImpl.java      | 281 ++++-----
 .../kstream/internals/MaterializedInternal.java    |   4 +-
 .../internals/SessionWindowedKStreamImpl.java      |  91 +--
 .../kstream/internals/TimeWindowedKStreamImpl.java |  50 +-
 .../internals/WindowedStreamPartitioner.java       |   2 +-
 .../internals/graph/BaseJoinProcessorNode.java     |  12 +
 .../internals/graph/BaseRepartitionNode.java       |  11 +
 .../GroupedTableOperationRepartitionNode.java      | 120 ++--
 .../internals/graph/KTableKTableJoinNode.java      |  61 +-
 .../graph/OptimizableRepartitionNode.java          |  80 +--
 ...sProcessorNode.java => ProcessorGraphNode.java} |  50 +-
 .../internals/graph/ProcessorParameters.java       |  12 +-
 .../internals/graph/StatefulProcessorNode.java     |  64 +-
 .../kstream/internals/graph/StreamSinkNode.java    |  51 +-
 .../kstream/internals/graph/StreamSourceNode.java  |  47 +-
 .../internals/graph/StreamStreamJoinNode.java      |  77 +--
 .../internals/graph/StreamTableJoinNode.java       |  40 +-
 .../kstream/internals/graph/StreamsGraphNode.java  |  61 +-
 .../internals/graph/TableProcessorNode.java        |  26 +-
 .../kstream/internals/graph/TableSourceNode.java   |  77 ++-
 .../internals/InternalTopologyBuilder.java         |   2 +-
 .../streams/state/KeyValueBytesStoreSupplier.java  |   2 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  11 +-
 .../FineGrainedAutoResetIntegrationTest.java       |   7 +-
 .../kstream/internals/AbstractStreamTest.java      |  11 +-
 .../internals/InternalStreamsBuilderTest.java      |  21 +-
 .../kstream/internals/graph/StreamsGraphTest.java  | 134 +++++
 .../processor/internals/StandbyTaskTest.java       |  13 +-
 .../processor/internals/StreamThreadTest.java      |   8 +-
 .../internals/StreamsPartitionAssignorTest.java    |  13 +-
 39 files changed, 1468 insertions(+), 1110 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index ba74f61..0442e2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -521,7 +521,7 @@ public class StreamsBuilder {
     public synchronized Topology build() {
         return build(null);
     }
-
+    
     /**
      * Returns the {@link Topology} that represents the specified processing logic and accepts
      * a {@link Properties} instance used to indicate whether to optimize topology or not.
@@ -531,6 +531,7 @@ public class StreamsBuilder {
      */
     public synchronized Topology build(final Properties props) {
         // the props instance will be used once optimization framework merged
+        internalStreamsBuilder.buildAndOptimizeTopology();
         return topology;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 460665c..9fce80f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -36,7 +36,7 @@ public abstract class AbstractStream<K> {
     protected final InternalStreamsBuilder builder;
     protected final String name;
     protected final Set<String> sourceNodes;
-    protected final StreamsGraphNode parentGraphNode;
+    protected final StreamsGraphNode streamsGraphNode;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
@@ -44,13 +44,13 @@ public abstract class AbstractStream<K> {
         this.builder = stream.builder;
         this.name = stream.name;
         this.sourceNodes = stream.sourceNodes;
-        this.parentGraphNode = stream.parentGraphNode;
+        this.streamsGraphNode = stream.streamsGraphNode;
     }
 
     AbstractStream(final InternalStreamsBuilder builder,
                    final String name,
                    final Set<String> sourceNodes,
-                   final StreamsGraphNode parentGraphNode) {
+                   final StreamsGraphNode streamsGraphNode) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
         }
@@ -58,14 +58,7 @@ public abstract class AbstractStream<K> {
         this.builder = builder;
         this.name = name;
         this.sourceNodes = sourceNodes;
-        this.parentGraphNode = parentGraphNode;
-    }
-
-    protected void addGraphNode(final StreamsGraphNode newNode) {
-        //TODO remove this once actually building the topology with Graph
-        if (parentGraphNode != null) {
-            parentGraphNode.addChildNode(newNode);
-        }
+        this.streamsGraphNode = streamsGraphNode;
     }
 
     // This method allows to expose the InternalTopologyBuilder instance
@@ -103,7 +96,8 @@ public abstract class AbstractStream<K> {
         };
     }
 
-    static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
+    static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
+        final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
         return new ValueTransformerWithKeySupplier<K, V, VR>() {
             @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 247c882..d02a569 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -24,12 +24,14 @@ import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNo
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collections;
 import java.util.Set;
 
 class GroupedStreamAggregateBuilder<K, V> {
+
     private final InternalStreamsBuilder builder;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
@@ -38,26 +40,11 @@ class GroupedStreamAggregateBuilder<K, V> {
     private final String name;
     private final StreamsGraphNode streamsGraphNode;
 
-    final Initializer<Long> countInitializer = new Initializer<Long>() {
-        @Override
-        public Long apply() {
-            return 0L;
-        }
-    };
+    final Initializer<Long> countInitializer = () -> 0L;
 
-    final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
-        @Override
-        public Long apply(K aggKey, V value, Long aggregate) {
-            return aggregate + 1;
-        }
-    };
+    final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1;
 
-    final Initializer<V> reduceInitializer = new Initializer<V>() {
-        @Override
-        public V apply() {
-            return null;
-        }
-    };
+    final Initializer<V> reduceInitializer = () -> null;
 
     GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
                                   final Serde<K> keySerde,
@@ -76,27 +63,25 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.streamsGraphNode = streamsGraphNode;
     }
 
-    <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
-                           final String functionName,
-                           final StoreBuilder storeBuilder,
-                           final boolean isQueryable) {
+
+    <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
+                                final String functionName,
+                                final StoreBuilder<? extends StateStore> storeBuilder,
+                                final boolean isQueryable) {
+
         final String aggFunctionName = builder.newProcessorName(functionName);
 
         OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
 
         final String sourceName = repartitionIfRequired(storeBuilder.name(), repartitionNodeBuilder);
-        builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
-        builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
-
 
         StreamsGraphNode parentNode = streamsGraphNode;
 
         if (!sourceName.equals(this.name)) {
             StreamsGraphNode repartitionNode = repartitionNodeBuilder.build();
-            streamsGraphNode.addChildNode(repartitionNode);
+            builder.addGraphNode(parentNode, repartitionNode);
             parentNode = repartitionNode;
         }
-
         StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
 
         ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
@@ -108,7 +93,7 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         StatefulProcessorNode<K, T> statefulProcessorNode = statefulProcessorNodeBuilder.build();
 
-        parentNode.addChildNode(statefulProcessorNode);
+        builder.addGraphNode(parentNode, statefulProcessorNode);
 
         return new KTableImpl<>(builder,
                                 aggFunctionName,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c42a93e..b502153 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -25,13 +26,19 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Objects;
+import java.util.PriorityQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
@@ -40,7 +47,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     final InternalTopologyBuilder internalTopologyBuilder;
     private final AtomicInteger index = new AtomicInteger(0);
 
+    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
+    private final NodeIdComparator nodeIdComparator = new NodeIdComparator();
+
     private static final String TOPOLOGY_ROOT = "root";
+    private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
 
     protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT, false) {
         @Override
@@ -57,18 +68,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
-        StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name,
-                                                                         topics,
-                                                                         consumed);
-
-        root.addChildNode(streamSourceNode);
-
-        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
-                                          name,
-                                          consumed.timestampExtractor(),
-                                          consumed.keyDeserializer(),
-                                          consumed.valueDeserializer(),
-                                          topics.toArray(new String[topics.size()]));
+        final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name,
+                                                                              topics,
+                                                                              consumed);
+        addGraphNode(root, streamSourceNode);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), false, streamSourceNode);
     }
@@ -77,17 +80,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
-        StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name,
-                                                                                topicPattern,
-                                                                                consumed);
-        root.addChildNode(streamPatternSourceNode);
+        final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name,
+                                                                                      topicPattern,
+                                                                                      consumed);
 
-        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
-                                          name,
-                                          consumed.timestampExtractor(),
-                                          consumed.keyDeserializer(),
-                                          consumed.valueDeserializer(),
-                                          topicPattern);
+        addGraphNode(root, streamPatternSourceNode);
 
         return new KStreamImpl<>(this,
                                  name,
@@ -97,21 +94,20 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     }
 
     @SuppressWarnings("unchecked")
-    public <K, V> KTable<K, V> table(final String topic,
-                                     final ConsumedInternal<K, V> consumed,
-                                     final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+    public <K, V, S extends StateStore> KTable<K, V> table(final String topic,
+                                                           final ConsumedInternal<K, V> consumed,
+                                                           final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
 
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+        final StoreBuilder<S> storeBuilder = (StoreBuilder<S>) new KeyValueStoreMaterializer<>(materialized).materialize();
 
         final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
         final String name = newProcessorName(KTableImpl.SOURCE_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeBuilder.name());
         final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
 
-        TableSourceNode.TableSourceNodeBuilder<K, V> tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder();
+        final TableSourceNode.TableSourceNodeBuilder<K, V, S> tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder();
 
-
-        TableSourceNode<K, V> tableSourceNode = tableSourceNodeBuilder.withNodeName(name)
+        final TableSourceNode<K, V, S> tableSourceNode = tableSourceNodeBuilder.withNodeName(name)
                                                                                .withSourceName(source)
                                                                                .withStoreBuilder(storeBuilder)
                                                                                .withConsumedInternal(consumed)
@@ -119,18 +115,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                                                .withTopic(topic)
                                                                                .build();
 
-        root.addChildNode(tableSourceNode);
-
-        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
-                                          source,
-                                          consumed.timestampExtractor(),
-                                          consumed.keyDeserializer(),
-                                          consumed.valueDeserializer(),
-                                          topic);
-        internalTopologyBuilder.addProcessor(name, processorSupplier, source);
-
-        internalTopologyBuilder.addStateStore(storeBuilder, name);
-        internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
+        addGraphNode(root, tableSourceNode);
 
         return new KTableImpl<>(this,
                                 name,
@@ -144,9 +129,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     }
 
     @SuppressWarnings("unchecked")
-    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
-                                                 final ConsumedInternal<K, V> consumed,
-                                                 final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+    public <K, V, S extends StateStore> GlobalKTable<K, V> globalTable(final String topic,
+                                                                       final ConsumedInternal<K, V> consumed,
+                                                                       final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         // explicitly disable logging for global stores
@@ -158,26 +143,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
         final ProcessorParameters processorParameters = new ProcessorParameters(tableSource, processorName);
 
-        TableSourceNode<K, V> tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withStoreBuilder(storeBuilder)
-                                                                                                    .withSourceName(sourceName)
-                                                                                                    .withConsumedInternal(consumed)
-                                                                                                    .withTopic(topic)
-                                                                                                    .withProcessorParameters(processorParameters)
-                                                                                                    .isGlobalKTable(true)
-                                                                                                    .build();
+        final TableSourceNode<K, V, S> tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withStoreBuilder(storeBuilder)
+                                                                                                 .withSourceName(sourceName)
+                                                                                                 .withConsumedInternal(consumed)
+                                                                                                 .withTopic(topic)
+                                                                                                 .withProcessorParameters(processorParameters)
+                                                                                                 .isGlobalKTable(true)
+                                                                                                 .build();
 
-        root.addChildNode(tableSourceNode);
+        addGraphNode(root, tableSourceNode);
 
-        internalTopologyBuilder.addGlobalStore(storeBuilder,
-                                               sourceName,
-                                               consumed.timestampExtractor(),
-                                               consumed.keyDeserializer(),
-                                               consumed.valueDeserializer(),
-                                               topic,
-                                               processorName,
-                                               tableSource);
-
-        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()), materialized.isQueryable());
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeBuilder.name()), materialized.isQueryable());
     }
 
     @Override
@@ -228,8 +204,65 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                        stateUpdateSupplier);
     }
 
+    void addGraphNode(StreamsGraphNode parent, StreamsGraphNode child) {
+        Objects.requireNonNull(parent, "parent node can't be null");
+        Objects.requireNonNull(child, "child node can't be null");
+        parent.addChildNode(child);
+        maybeAddNodeForOptimizationMetadata(child);
+    }
+
+    void addGraphNode(Collection<StreamsGraphNode> parents, StreamsGraphNode child) {
+        Objects.requireNonNull(parents, "parent node can't be null");
+        Objects.requireNonNull(child, "child node can't be null");
+
+        if (parents.isEmpty()) {
+            throw new StreamsException("Parent node collection can't be empty");
+        }
+
+        for (StreamsGraphNode parent : parents) {
+            addGraphNode(parent, child);
+        }
+    }
+
+    void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
+        node.setId(nodeIdCounter.getAndIncrement());
+    }
+
+    public void buildAndOptimizeTopology() {
+
+        final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, nodeIdComparator);
+
+        graphNodePriorityQueue.offer(root);
+
+        while (!graphNodePriorityQueue.isEmpty()) {
+            final StreamsGraphNode streamGraphNode = graphNodePriorityQueue.remove();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Adding nodes to topology {} child nodes {}", streamGraphNode, streamGraphNode.children());
+            }
+
+            if (streamGraphNode.allParentsWrittenToTopology() && !streamGraphNode.hasWrittenToTopology()) {
+                streamGraphNode.writeToTopology(internalTopologyBuilder);
+                streamGraphNode.setHasWrittenToTopology(true);
+            }
+
+            for (StreamsGraphNode graphNode : streamGraphNode.children()) {
+                graphNodePriorityQueue.offer(graphNode);
+            }
+        }
+    }
+
 
     public StreamsGraphNode root() {
         return root;
     }
+
+    private static class NodeIdComparator implements Comparator<StreamsGraphNode>, Serializable {
+
+        @Override
+        public int compare(final StreamsGraphNode o1,
+                           final StreamsGraphNode o2) {
+            return o1.id().compareTo(o2.id());
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 8fa1752..5d4f9f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Objects;
 import java.util.Set;
@@ -55,13 +54,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                        final boolean repartitionRequired,
                        final StreamsGraphNode streamsGraphNode) {
         super(builder, name, sourceNodes, streamsGraphNode);
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder,
-                                                                    keySerde,
-                                                                    valSerde,
-                                                                    repartitionRequired,
-                                                                    sourceNodes,
-                                                                    name,
-                                                                    streamsGraphNode);
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
+            builder,
+            keySerde,
+            valSerde,
+            repartitionRequired,
+            sourceNodes,
+            name,
+            streamsGraphNode
+        );
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
@@ -69,7 +70,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer) {
-        return reduce(reducer, Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde));
+        return reduce(reducer, Materialized.with(keySerde, valSerde));
     }
 
     @Override
@@ -89,9 +90,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         }
 
         return doAggregate(
-                new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
-                REDUCE_NAME,
-                materializedInternal);
+            new KStreamReduce<>(materializedInternal.storeName(), reducer),
+            REDUCE_NAME,
+            materializedInternal
+        );
     }
 
     @Override
@@ -110,15 +112,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         }
 
         return doAggregate(
-                new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
-                AGGREGATE_NAME,
-                materializedInternal);
+            new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
+            AGGREGATE_NAME,
+            materializedInternal
+        );
     }
 
     @Override
     public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                         final Aggregator<? super K, ? super V, VR> aggregator) {
-        return aggregate(initializer, aggregator, Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>with(keySerde, null));
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
     }
 
     @Override
@@ -158,35 +161,42 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
-        return new TimeWindowedKStreamImpl<>(windows,
-                                             builder,
-                                             sourceNodes,
-                                             name,
-                                             keySerde,
-                                             valSerde,
-                                             repartitionRequired,
-                                             parentGraphNode);
+
+        return new TimeWindowedKStreamImpl<>(
+            windows,
+            builder,
+            sourceNodes,
+            name,
+            keySerde,
+            valSerde,
+            repartitionRequired,
+            streamsGraphNode
+        );
     }
 
     @Override
     public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
-        return new SessionWindowedKStreamImpl<>(windows,
-                                                builder,
-                                                sourceNodes,
-                                                name,
-                                                keySerde,
-                                                valSerde,
-                                                aggregateBuilder,
-                                                parentGraphNode);
-    }
 
-    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
-                                         final String functionName,
-                                         final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
-
-        final StoreBuilder<KeyValueStore<K, T>> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal)
-                .materialize();
-        return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable());
+        return new SessionWindowedKStreamImpl<>(
+            windows,
+            builder,
+            sourceNodes,
+            name,
+            keySerde,
+            valSerde,
+            aggregateBuilder,
+            streamsGraphNode
+        );
+    }
 
+    private <KR, T> KTable<KR, T> doAggregate(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
+                                              final String functionName,
+                                              final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+        return aggregateBuilder.build(
+            aggregateSupplier,
+            functionName,
+            new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+            materializedInternal.isQueryable()
+        );
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index be98cca..5858a8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -31,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRep
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
-import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -52,26 +49,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
-    private final Initializer<Long> countInitializer = new Initializer<Long>() {
-        @Override
-        public Long apply() {
-            return 0L;
-        }
-    };
+    private final Initializer<Long> countInitializer = () -> 0L;
 
-    private final Aggregator<K, V, Long> countAdder = new Aggregator<K, V, Long>() {
-        @Override
-        public Long apply(K aggKey, V value, Long aggregate) {
-            return aggregate + 1L;
-        }
-    };
+    private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
 
-    private Aggregator<K, V, Long> countSubtractor = new Aggregator<K, V, Long>() {
-        @Override
-        public Long apply(K aggKey, V value, Long aggregate) {
-            return aggregate - 1L;
-        }
-    };
+    private Aggregator<K, V, Long> countSubtractor = (aggKey, value, aggregate) -> aggregate - 1L;
 
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
@@ -84,31 +66,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         this.valSerde = valSerde;
     }
 
-    private <T> void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
-                                    final String topic,
-                                    final String funcName,
-                                    final String sourceName,
-                                    final String sinkName) {
-
-        final Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        final Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        final Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
-        final Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
-        final ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        final ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
-
-        // send the aggregate key-value pairs to the intermediate topic for partitioning
-        builder.internalTopologyBuilder.addInternalTopic(topic);
-        builder.internalTopologyBuilder.addSink(sinkName, topic, keySerializer, changedValueSerializer, null, this.name);
-
-        // read the intermediate topic with RecordMetadataTimestampExtractor
-        builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
-
-        // aggregate the values with the aggregator and local store
-        builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
-    }
-
+    @SuppressWarnings("unchecked")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
@@ -117,29 +75,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         final String funcName = builder.newProcessorName(functionName);
         final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-
-        buildAggregate(aggregateSupplier,
-                       topic,
-                       funcName,
-                       sourceName,
-                       sinkName
-        );
-
-        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized)
-                                                          .materialize(), funcName);
-
-
         StreamsGraphNode repartitionNode = createRepartitionNode(sinkName,
                                                                  sourceName,
                                                                  topic);
-        addGraphNode(repartitionNode);
 
-        StatefulProcessorNode statefulProcessorNode = createStatefulProcessorNode(materialized,
-                                                                                  funcName,
-                                                                                  aggregateSupplier);
+        // the passed in StreamsGraphNode must be the parent of the repartition node
+        builder.addGraphNode(this.streamsGraphNode, repartitionNode);
 
-        repartitionNode.addChildNode(statefulProcessorNode);
+        StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode(materialized,
+                                                                               funcName,
+                                                                               aggregateSupplier);
 
+        // now the repartition node must be the parent of the StateProcessorNode
+        builder.addGraphNode(repartitionNode, statefulProcessorNode);
 
         // return the KTable representation with the intermediate topic as the sources
         return new KTableImpl<>(builder,
@@ -152,9 +100,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     }
 
     @SuppressWarnings("unchecked")
-    private <T> StatefulProcessorNode createStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
-                                                                  final String functionName,
-                                                                  final ProcessorSupplier aggregateSupplier) {
+    private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
+                                                               final String functionName,
+                                                               final ProcessorSupplier aggregateSupplier) {
 
         ProcessorParameters aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index acfdf35..5451a86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -16,9 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -39,16 +37,15 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
-import org.apache.kafka.streams.kstream.internals.graph.StatelessProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -116,11 +113,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private final boolean repartitionRequired;
 
-    public KStreamImpl(final InternalStreamsBuilder builder,
-                       final String name,
-                       final Set<String> sourceNodes,
-                       final boolean repartitionRequired,
-                       final StreamsGraphNode streamsGraphNode) {
+    KStreamImpl(final InternalStreamsBuilder builder,
+                final String name,
+                final Set<String> sourceNodes,
+                final boolean repartitionRequired,
+                final StreamsGraphNode streamsGraphNode) {
         super(builder, name, sourceNodes, streamsGraphNode);
         this.repartitionRequired = repartitionRequired;
     }
@@ -128,14 +125,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = builder.newProcessorName(FILTER_NAME);
+        final String name = builder.newProcessorName(FILTER_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
-        StatelessProcessorNode<K, V> filterProcessorNode = new StatelessProcessorNode<>(name,
-                                                                                        processorParameters,
-                                                                                        repartitionRequired);
-        addGraphNode(filterProcessorNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
+
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
+        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name,
+                                                                                                      processorParameters,
+                                                                                                      repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterProcessorNode);
     }
@@ -143,16 +140,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = builder.newProcessorName(FILTER_NAME);
+        final String name = builder.newProcessorName(FILTER_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
 
-        StatelessProcessorNode<K, V> filterNotProcessorNode = new StatelessProcessorNode<>(name,
-                                                                                           processorParameters,
-                                                                                           repartitionRequired);
 
-        addGraphNode(filterNotProcessorNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
+        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name,
+                                                                                                         processorParameters,
+                                                                                                         repartitionRequired);
+
+        builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterNotProcessorNode);
     }
@@ -161,44 +158,46 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
-        StatelessProcessorNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
+
+        final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
+
         selectKeyProcessorNode.keyChangingOperation(true);
-        addGraphNode(selectKeyProcessorNode);
+        builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
         return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), sourceNodes, true, selectKeyProcessorNode);
     }
 
-    private <K1> StatelessProcessorNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
-        String name = builder.newProcessorName(KEY_SELECT_NAME);
 
+    private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+        final String name = builder.newProcessorName(KEY_SELECT_NAME);
 
-        KStreamMap kStreamMap = new KStreamMap<>(
-            (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value));
 
+        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>(
+            (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value));
 
-        ProcessorParameters<K1, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
 
-        builder.internalTopologyBuilder.addProcessor(name, kStreamMap, this.name);
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
 
-        return  new StatelessProcessorNode<>(name,
-                                             processorParameters,
-                                             repartitionRequired);
+        return new ProcessorGraphNode<>(
+            name,
+            processorParameters,
+            repartitionRequired
+        );
 
     }
 
     @Override
     public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newProcessorName(MAP_NAME);
+        final String name = builder.newProcessorName(MAP_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
 
-        StatelessProcessorNode<K1, V1> mapProcessorNode = new StatelessProcessorNode<>(name,
-                                                                                       processorParameters,
-                                                                                       true);
-        mapProcessorNode.keyChangingOperation(true);
-        addGraphNode(mapProcessorNode);
+        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
+                                                                                                   processorParameters,
+                                                                                     true);
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name);
+        mapProcessorNode.keyChangingOperation(true);
+        builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, true, mapProcessorNode);
     }
@@ -212,15 +211,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newProcessorName(MAPVALUES_NAME);
+        final String name = builder.newProcessorName(MAPVALUES_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
 
-        StatelessProcessorNode<K, V> mapValuesProcessorNode = new StatelessProcessorNode<>(name,
-                                                                                           processorParameters,
-                                                                                           repartitionRequired);
-        addGraphNode(mapValuesProcessorNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
+
+        final ProcessorGraphNode<? super  K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
+                                                                                                         processorParameters,
+                                                                                                         repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode);
     }
@@ -231,30 +230,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
         final String name = builder.newProcessorName(PRINTING_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
 
-        StatelessProcessorNode<K, V> printNode = new StatelessProcessorNode<>(name,
-                                                                              processorParameters,
-                                                                              false);
-        addGraphNode(printNode);
-        builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
+
+        final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
+                                                                                            processorParameters,
+                                                                            false);
+        builder.addGraphNode(this.streamsGraphNode, printNode);
     }
 
     @Override
     public <K1, V1> KStream<K1, V1> flatMap(
         final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newProcessorName(FLATMAP_NAME);
+        final String name = builder.newProcessorName(FLATMAP_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
 
-        StatelessProcessorNode<K1, V1> flatMapNode = new StatelessProcessorNode<>(name,
-                                                                                  processorParameters,
-                                                                                  true);
+
+        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
+                                                                                              processorParameters,
+                                                                                true);
         flatMapNode.keyChangingOperation(true);
 
-        addGraphNode(flatMapNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
+        builder.addGraphNode(this.streamsGraphNode, flatMapNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, true, flatMapNode);
     }
@@ -267,15 +266,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = builder.newProcessorName(FLATMAPVALUES_NAME);
+        final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
 
-        StatelessProcessorNode<K, VR> flatMapValuesNode = new StatelessProcessorNode<>(name,
-                                                                                       processorParameters,
-                                                                                       repartitionRequired);
-        addGraphNode(flatMapValuesNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
+
+        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
+                                                                                                    processorParameters,
+                                                                                                    repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode);
     }
@@ -290,32 +289,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
 
-        String branchName = builder.newProcessorName(BRANCH_NAME);
+        final String branchName = builder.newProcessorName(BRANCH_NAME);
 
-        String[] childNames = new String[predicates.length];
+        final String[] childNames = new String[predicates.length];
         for (int i = 0; i < predicates.length; i++) {
             childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
         }
 
-        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
 
-        StatelessProcessorNode<K, V> branchNode = new StatelessProcessorNode<>(branchName,
-                                                                               processorParameters,
-                                                                               false);
-        addGraphNode(branchNode);
+        final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName,
+                                                                             processorParameters,
+                                                                             false);
+        builder.addGraphNode(this.streamsGraphNode, branchNode);
 
-        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+        final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
 
         for (int i = 0; i < predicates.length; i++) {
-            ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
+            final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
+
+            final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i],
+                                                                                      innerProcessorParameters,
+                                                                                      repartitionRequired);
 
-            StatelessProcessorNode<K, V> branchChildNode = new StatelessProcessorNode<>(childNames[i],
-                                                                                        innerProcessorParameters,
-                                                                                        repartitionRequired);
-            branchNode.addChildNode(branchChildNode);
-            builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName);
+            builder.addGraphNode(branchNode, branchChildNode);
             branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired, branchChildNode);
         }
 
@@ -330,25 +328,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private KStream<K, V> merge(final InternalStreamsBuilder builder,
                                 final KStream<K, V> stream) {
-        KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
-        String name = builder.newProcessorName(MERGE_NAME);
-        String[] parentNames = {this.name, streamImpl.name};
-        Set<String> allSourceNodes = new HashSet<>();
+        final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
+        final String name = builder.newProcessorName(MERGE_NAME);
+        final Set<String> allSourceNodes = new HashSet<>();
 
-        boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
+        final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(streamImpl.sourceNodes);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
 
-        StatelessProcessorNode<K, V> mergeNode = new StatelessProcessorNode<>(name,
-                                                                              processorParameters,
-                                                                              requireRepartitioning,
-                                                                              Arrays.asList(parentNames));
 
-        addGraphNode(mergeNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);
+        final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name,
+                                                                                            processorParameters,
+                                                                                            requireRepartitioning);
 
+
+        builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
         return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode);
     }
 
@@ -356,25 +352,32 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
         final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
         to(topic, producedInternal);
-        return builder.stream(Collections.singleton(topic),
-                              new ConsumedInternal<>(producedInternal.keySerde(),
-                                                     producedInternal.valueSerde(),
-                                                     new FailOnInvalidTimestamp(),
-                                                     null));
+        return builder.stream(
+            Collections.singleton(topic),
+            new ConsumedInternal<>(
+                producedInternal.keySerde(),
+                producedInternal.valueSerde(),
+                new FailOnInvalidTimestamp(),
+                null
+            )
+        );
     }
 
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        String name = builder.newProcessorName(FOREACH_NAME);
+        final String name = builder.newProcessorName(FOREACH_NAME);
+
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
+            new KStreamPeek<>(action, false),
+            name
+        );
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPeek<>(action, false), name);
 
-        StatelessProcessorNode<K, V> foreachNode = new StatelessProcessorNode<>(name,
-                                                                                processorParameters,
-                                                                                repartitionRequired);
-        addGraphNode(foreachNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name);
+        final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
+                                                                              processorParameters,
+                                                                              repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, foreachNode);
     }
 
     @Override
@@ -382,37 +385,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(action, "action can't be null");
         final String name = builder.newProcessorName(PEEK_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
+        final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(
+            new KStreamPeek<>(action, true),
+            name
+        );
+
+        final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name,
+                                                                                           processorParameters,
+                                                                                           repartitionRequired
+        );
 
-        StatelessProcessorNode<K, V> peekNode = new StatelessProcessorNode<>(name,
-                                                                             processorParameters,
-                                                                             repartitionRequired);
-        addGraphNode(peekNode);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name);
+        builder.addGraphNode(this.streamsGraphNode, peekNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired, peekNode);
     }
 
     @Override
     public KStream<K, V> through(final String topic) {
-        return through(topic, Produced.<K, V>with(null, null, null));
+        return through(topic, Produced.with(null, null, null));
     }
 
     @Override
     public void to(final String topic) {
-        to(topic, Produced.<K, V>with(null, null, null));
+        to(topic, Produced.with(null, null, null));
     }
 
     @Override
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(new StaticTopicNameExtractor<K, V>(topic), new ProducedInternal<>(produced));
+        to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced));
     }
 
     @Override
     public void to(final TopicNameExtractor<K, V> topicExtractor) {
-        to(topicExtractor, Produced.<K, V>with(null, null, null));
+        to(topicExtractor, Produced.with(null, null, null));
     }
 
     @Override
@@ -425,47 +432,35 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @SuppressWarnings("unchecked")
     private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
         final String name = builder.newProcessorName(SINK_NAME);
-        final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
-        final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
-        final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
-
-        if (partitioner == null && keySerializer instanceof WindowedSerializer) {
-            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
-            builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, windowedPartitioner, this.name);
-        } else {
-            builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, partitioner, this.name);
-        }
 
-        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
+        final StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
             name,
             topicExtractor,
-            produced);
-        addGraphNode(sinkNode);
+            produced
+        );
+
+        builder.addGraphNode(this.streamsGraphNode, sinkNode);
     }
 
     @Override
     public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
                                               final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
-        String name = builder.newProcessorName(TRANSFORM_NAME);
+        final String name = builder.newProcessorName(TRANSFORM_NAME);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
 
-        StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name,
-                                                                                  processorParameters,
-                                                                                  stateStoreNames,
-                                                                                  null,
-                                                                                  null,
-                                                                                  true);
+
+        final StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name,
+                                                                                        processorParameters,
+                                                                                        stateStoreNames,
+                                                                                        null,
+                                                                                        true);
         transformNode.keyChangingOperation(true);
-        addGraphNode(transformNode);
+        builder.addGraphNode(this.streamsGraphNode, transformNode);
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
-        if (stateStoreNames != null && stateStoreNames.length > 0) {
-            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
-        }
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true, null);
+        return new KStreamImpl<>(builder, name, sourceNodes, true, transformNode);
     }
 
     @Override
@@ -488,23 +483,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                   final String... stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerWithKeySupplier), this.name);
-        if (stateStoreNames != null && stateStoreNames.length > 0) {
-            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
-        }
-
 
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
 
-
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
-
-        StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name,
-                                                                                 processorParameters,
-                                                                                 stateStoreNames,
-                                                                                 null,
-                                                                                 null,
-                                                                                 repartitionRequired);
-        addGraphNode(transformNode);
+        final StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name,
+                                                                                       processorParameters,
+                                                                                       stateStoreNames,
+                                                                                       null,
+                                                                                       repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, transformNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode);
     }
@@ -512,29 +499,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
                         final String... stateStoreNames) {
-        final String name = builder.newProcessorName(PROCESSOR_NAME);
-
-        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
 
-        StatefulProcessorNode<K, V> transformNode = new StatefulProcessorNode<>(name,
-                                                                                processorParameters,
-                                                                                stateStoreNames,
-                                                                                null,
-                                                                                null,
-                                                                                false);
-        addGraphNode(transformNode);
+        Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
+        final String name = builder.newProcessorName(PROCESSOR_NAME);
 
-        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-        if (stateStoreNames != null && stateStoreNames.length > 0) {
-            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
-        }
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        final StatefulProcessorNode<K, V> processNode = new StatefulProcessorNode<>(name,
+                                                                                    processorParameters,
+                                                                                    stateStoreNames,
+                                                                                    null,
+                                                                                    repartitionRequired);
+        builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
     @Override
     public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                       final JoinWindows windows) {
-        return join(other, joiner, windows, Joined.<K, V, V1>with(null, null, null));
+        return join(other, joiner, windows, Joined.with(null, null, null));
     }
 
     @Override
@@ -542,15 +524,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                         final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                         final JoinWindows windows,
                                         final Joined<K, V, VO> joined) {
-        return doJoin(otherStream, joiner, windows, joined,
-                      new KStreamImplJoin(false, false));
+
+        return doJoin(otherStream,
+                      joiner,
+                      windows,
+                      joined,
+                      new KStreamImplJoin(false, false, this.streamsGraphNode));
+
     }
 
     @Override
     public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                            final JoinWindows windows) {
-        return outerJoin(other, joiner, windows, Joined.<K, V, V1>with(null, null, null));
+        return outerJoin(other, joiner, windows, Joined.with(null, null, null));
     }
 
     @Override
@@ -558,7 +545,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                              final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                              final JoinWindows windows,
                                              final Joined<K, V, VO> joined) {
-        return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
+        return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true, this.streamsGraphNode));
     }
 
     private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
@@ -584,11 +571,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         joinThis.ensureJoinableWith(joinOther);
 
-        return join.join(joinThis,
-                         joinOther,
-                         joiner,
-                         windows,
-                         joined);
+        return join.join(
+            joinThis,
+            joinOther,
+            joiner,
+            windows,
+            joined
+        );
     }
 
     /**
@@ -596,24 +585,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
      *
      * @param keySerde Serdes for serializing the keys
-     * @param valSerde Serdes for serilaizing the values
+     * @param valSerde Serdes for serializing the values
      * @return a new {@link KStreamImpl}
      */
     private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
                                                  final Serde<V> valSerde) {
-        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
-        String repartitionedSourceName = createRepartitionedSource(builder,
-                                                                   keySerde,
-                                                                   valSerde,
-                                                                   null,
-                                                                   name,
-                                                                   repartitionNodeBuilder);
 
-        OptimizableRepartitionNode<K, V> repartitionNode = repartitionNodeBuilder.build();
-        addGraphNode(repartitionNode);
+        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        final String repartitionedSourceName = createRepartitionedSource(builder,
+                                                                         keySerde,
+                                                                         valSerde,
+                                                                          null,
+                                                                         name,
+                                                                         optimizableRepartitionNodeBuilder);
+
+        final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
+        builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode);
 
-        return new KStreamImpl<>(builder, repartitionedSourceName, Collections
-            .singleton(repartitionedSourceName), false, repartitionNode);
+        return new KStreamImpl<>(builder, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode);
     }
 
     static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
@@ -621,40 +610,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                      final Serde<V1> valSerde,
                                                      final String topicNamePrefix,
                                                      final String name,
-                                                     final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder) {
+                                                     final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
 
-        Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
-        Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
-        Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
-        Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
-        String baseName = topicNamePrefix != null ? topicNamePrefix : name;
+        final String baseName = topicNamePrefix != null ? topicNamePrefix : name;
+        final String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
+        final String sinkName = builder.newProcessorName(SINK_NAME);
+        final String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
+        final String sourceName = builder.newProcessorName(SOURCE_NAME);
 
-        String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
-        String sinkName = builder.newProcessorName(SINK_NAME);
-        String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
-        String sourceName = builder.newProcessorName(SOURCE_NAME);
+        final Predicate<K1, V1> notNullKeyPredicate = (k, v) -> k != null;
 
-        Predicate<K1, V1> nullKeyPredicate = (k, v) -> k != null;
-
-        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(nullKeyPredicate, false), nullKeyFilterProcessorName);
-
-        repartitionNodeBuilder.withKeySerde(keySerde)
-            .withValueSerde(valSerde)
-            .withSourceName(sourceName)
-            .withRepartitionTopic(repartitionTopic)
-            .withSinkName(sinkName)
-            .withProcessorParameters(processorParameters)
-            // reusing the source name for the graph node name
-            // adding explicit variable as it simplifies logic
-            .withNodeName(sourceName);
-
-        builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
-        builder.internalTopologyBuilder.addProcessor(nullKeyFilterProcessorName, new KStreamFilter<>(nullKeyPredicate, false), name);
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(
+            new KStreamFilter<>(notNullKeyPredicate, false),
+            nullKeyFilterProcessorName
+        );
 
-        builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
-                                                null, nullKeyFilterProcessorName);
-        builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
-                                                  keyDeserializer, valDeserializer, repartitionTopic);
+        optimizableRepartitionNodeBuilder.withKeySerde(keySerde)
+                                         .withValueSerde(valSerde)
+                                         .withSourceName(sourceName)
+                                         .withRepartitionTopic(repartitionTopic)
+                                         .withSinkName(sinkName)
+                                         .withProcessorParameters(processorParameters)
+                                         // reusing the source name for the graph node name
+                                         // adding explicit variable as it simplifies logic
+                                         .withNodeName(sourceName);
 
         return sourceName;
     }
@@ -663,7 +642,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                           final JoinWindows windows) {
-        return leftJoin(other, joiner, windows, Joined.<K, V, V1>with(null, null, null));
+        return leftJoin(other, joiner, windows, Joined.with(null, null, null));
     }
 
     @Override
@@ -672,17 +651,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                             final JoinWindows windows,
                                             final Joined<K, V, VO> joined) {
         Objects.requireNonNull(joined, "joined can't be null");
-        return doJoin(other,
-                      joiner,
-                      windows,
-                      joined,
-                      new KStreamImplJoin(true, false));
+        return doJoin(
+            other,
+            joiner,
+            windows,
+            joined,
+            new KStreamImplJoin(true, false, this.streamsGraphNode)
+        );
+
     }
 
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return join(other, joiner, Joined.<K, V, V1>with(null, null, null));
+        return join(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
@@ -724,20 +706,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
 
-        ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(valueGetterSupplier,
-                                                                                  joiner,
-                                                                                  keyMapper,
-
-                                                                                  leftJoin);
-        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
+            valueGetterSupplier,
+            joiner,
+            keyMapper,
+            leftJoin
+        );
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
 
-        StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
+        final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
                                                                                   processorParameters,
-                                                                                  new String[]{});
-        streamTableJoinNode.setGlobalKTableJoin(true);
-        addGraphNode(streamTableJoinNode);
+                                                                                  new String[]{},
+                                                                                  null);
+        builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode);
     }
@@ -752,26 +734,29 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
-        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
+            ((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
+            joiner,
+            leftJoin
+        );
 
-        ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
-                                                                            joiner,
-                                                                            leftJoin);
 
-        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
-        StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
-                                                                                  processorParameters,
-                                                                                  ((KTableImpl) other).valueGetterSupplier().storeNames());
+        final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
+            name,
+            processorParameters,
+            ((KTableImpl) other).valueGetterSupplier().storeNames(),
+            this.name
+        );
 
-        addGraphNode(streamTableJoinNode);
+        builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
         return new KStreamImpl<>(builder, name, allSourceNodes, false, streamTableJoinNode);
     }
 
     @Override
     public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return leftJoin(other, joiner, Joined.<K, V, V1>with(null, null, null));
+        return leftJoin(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
@@ -791,7 +776,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
-        return groupBy(selector, Serialized.<K1, V>with(null, null));
+        return groupBy(selector, Serialized.with(null, null));
     }
 
     @Override
@@ -800,43 +785,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
         final SerializedInternal<KR, V> serializedInternal = new SerializedInternal<>(serialized);
-        final StatelessProcessorNode<K, V> graphNode = internalSelectKey(selector);
-        graphNode.keyChangingOperation(true);
+        final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector);
+        selectKeyMapNode.keyChangingOperation(true);
+
+        builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
+        return new KGroupedStreamImpl<>(
+            builder,
+            selectKeyMapNode.nodeName(),
+            sourceNodes,
+            serializedInternal.keySerde(),
+            serializedInternal.valueSerde(),
+            true,
+            selectKeyMapNode
+        );
 
-        addGraphNode(graphNode);
-        return new KGroupedStreamImpl<>(builder,
-                                        graphNode.nodeName(),
-                                        sourceNodes,
-                                        serializedInternal.keySerde(),
-                                        serializedInternal.valueSerde(),
-                                        true,
-                                        graphNode);
     }
 
     @Override
     public KGroupedStream<K, V> groupByKey() {
-        return groupByKey(Serialized.<K, V>with(null, null));
+        return groupByKey(Serialized.with(null, null));
     }
 
     @Override
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
-
-        final StatelessProcessorNode<K, V> graphNode = new StatelessProcessorNode<>(
-            this.name,
-            null,
-            repartitionRequired,
-            Collections.<String>emptyList());
-
-        addGraphNode(graphNode);
-
         return new KGroupedStreamImpl<>(builder,
                                         this.name,
                                         sourceNodes,
                                         serializedInternal.keySerde(),
                                         serializedInternal.valueSerde(),
                                         this.repartitionRequired,
-                                        graphNode);
+                                        streamsGraphNode);
 
     }
 
@@ -861,11 +840,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         private final boolean leftOuter;
         private final boolean rightOuter;
+        private final StreamsGraphNode parentGraphNode;
 
         KStreamImplJoin(final boolean leftOuter,
-                        final boolean rightOuter) {
+                        final boolean rightOuter,
+                        final StreamsGraphNode parentGraphNode) {
             this.leftOuter = leftOuter;
             this.rightOuter = rightOuter;
+            this.parentGraphNode = parentGraphNode;
         }
 
         @SuppressWarnings("unchecked")
@@ -874,71 +856,83 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
                                                    final JoinWindows windows,
                                                    final Joined<K1, V1, V2> joined) {
-            String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
-            String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
-            String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
-            String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
-            String joinMergeName = builder.newProcessorName(MERGE_NAME);
+            final String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+            final String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+            final String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
+            final String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
+            final String joinMergeName = builder.newProcessorName(MERGE_NAME);
+
+            StreamsGraphNode thisStreamsGraphNode = ((AbstractStream) lhs).streamsGraphNode;
+            StreamsGraphNode otherStreamsGraphNode = ((AbstractStream) other).streamsGraphNode;
 
-            final StoreBuilder<WindowStore<K1, V1>> thisWindow =
+            final StoreBuilder<WindowStore<K1, V1>> thisWindowStore =
                 createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store");
 
-            final StoreBuilder<WindowStore<K1, V2>> otherWindow =
+            final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
                 createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store");
 
-            KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(),
-                                                                                   windows.beforeMs + windows.afterMs + 1,
-                                                                                   windows.maintainMs());
-            KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(),
-                                                                                    windows.beforeMs + windows.afterMs + 1,
-                                                                                    windows.maintainMs());
-
-            final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
-                                                                                                        windows.beforeMs,
-                                                                                                        windows.afterMs,
-                                                                                                        joiner,
-                                                                                                        leftOuter);
-            final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
-                                                                                                         windows.afterMs,
-                                                                                                         windows.beforeMs,
-                                                                                                         reverseJoiner(joiner),
-                                                                                                         rightOuter);
-
-            KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
-
-            StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
-
-            ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
-            ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
-            ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
-            ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
-            ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
+
+            final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(
+                thisWindowStore.name(),
+                windows.beforeMs + windows.afterMs + 1,
+                windows.maintainMs()
+            );
+
+            final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
+            final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
+            builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
+
+            final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(
+                otherWindowStore.name(),
+                windows.beforeMs + windows.afterMs + 1,
+                windows.maintainMs()
+            );
+
+            final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
+            final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
+            builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
+
+            final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(
+                otherWindowStore.name(),
+                windows.beforeMs,
+                windows.afterMs,
+                joiner,
+                leftOuter
+            );
+
+            final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(
+                thisWindowStore.name(),
+                windows.afterMs,
+                windows.beforeMs,
+                reverseJoiner(joiner),
+                rightOuter
+            );
+
+            final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+
+            final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
+
+            final ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
+            final ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
+            final ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
 
             joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
-                .withJoinThisProcessorParameters(joinThisProcessorParams)
-                .withJoinOtherProcessorParameters(joinOtherProcessorParams)
-                .withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
-                .withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
-                .withThisWindowStoreBuilder(thisWindow)
-                .withOtherWindowStoreBuilder(otherWindow)
-                .withLeftHandSideStreamName(((AbstractStream) lhs).name)
-                .withOtherStreamName(((AbstractStream) other).name)
-                .withValueJoiner(joiner)
-                .withNodeName(joinMergeName);
-
-            addGraphNode(joinBuilder.build());
-
-            builder.internalTopologyBuilder.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
-            builder.internalTopologyBuilder.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
-            builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, thisWindowStreamName);
-            builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
-            builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-            builder.internalTopologyBuilder.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
-            builder.internalTopologyBuilder.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
-
-            Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
+                       .withJoinThisProcessorParameters(joinThisProcessorParams)
+                       .withJoinOtherProcessorParameters(joinOtherProcessorParams)
+                       .withThisWindowStoreBuilder(thisWindowStore)
+                       .withOtherWindowStoreBuilder(otherWindowStore)
+                       .withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
+                       .withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
+                       .withValueJoiner(joiner)
+                       .withNodeName(joinMergeName);
+
+            final StreamsGraphNode joinGraphNode = joinBuilder.build();
+
+            builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode);
+
+            final Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, null);
+            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, joinGraphNode);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index b91431a..35dd7a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -35,22 +35,22 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
     private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
     private final SessionWindows windows;
-    private final Initializer<T> initializer;
-    private final Aggregator<? super K, ? super V, T> aggregator;
-    private final Merger<? super K, T> sessionMerger;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+    private final Merger<? super K, Agg> sessionMerger;
 
     private boolean sendOldValues = false;
 
     KStreamSessionWindowAggregate(final SessionWindows windows,
                                   final String storeName,
-                                  final Initializer<T> initializer,
-                                  final Aggregator<? super K, ? super V, T> aggregator,
-                                  final Merger<? super K, T> sessionMerger) {
+                                  final Initializer<Agg> initializer,
+                                  final Aggregator<? super K, ? super V, Agg> aggregator,
+                                  final Merger<? super K, Agg> sessionMerger) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -70,8 +70,8 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
 
     private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> {
 
-        private SessionStore<K, T> store;
-        private TupleForwarder<Windowed<K>, T> tupleForwarder;
+        private SessionStore<K, Agg> store;
+        private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
@@ -79,7 +79,7 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-            store = (SessionStore<K, T>) context.getStateStore(storeName);
+            store = (SessionStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
@@ -97,20 +97,20 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
             }
 
             final long timestamp = context().timestamp();
-            final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
+            final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
             SessionWindow mergedWindow = newSessionWindow;
-            T agg = initializer.apply();
+            Agg agg = initializer.apply();
 
             try (
-                final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(
+                final KeyValueIterator<Windowed<K>, Agg> iterator = store.findSessions(
                     key,
                     timestamp - windows.inactivityGap(),
                     timestamp + windows.inactivityGap()
                 )
             ) {
                 while (iterator.hasNext()) {
-                    final KeyValue<Windowed<K>, T> next = iterator.next();
+                    final KeyValue<Windowed<K>, Agg> next = iterator.next();
                     merged.add(next);
                     agg = sessionMerger.apply(key, agg, next.value);
                     mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
@@ -120,7 +120,7 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
             agg = aggregator.apply(key, value, agg);
             final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
             if (!mergedWindow.equals(newSessionWindow)) {
-                for (final KeyValue<Windowed<K>, T> session : merged) {
+                for (final KeyValue<Windowed<K>, Agg> session : merged) {
                     store.remove(session.key);
                     tupleForwarder.maybeForward(session.key, null, session.value);
                 }
@@ -139,36 +139,36 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, T> view() {
-        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
+        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
             @Override
-            public KTableValueGetter<Windowed<K>, T> get() {
+            public KTableValueGetter<Windowed<K>, Agg> get() {
                 return new KTableSessionWindowValueGetter();
             }
 
             @Override
             public String[] storeNames() {
-                return new String[]{storeName};
+                return new String[] {storeName};
             }
         };
     }
 
-    private class KTableSessionWindowValueGetter implements KTableValueGetter<Windowed<K>, T> {
-        private SessionStore<K, T> store;
+    private class KTableSessionWindowValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
+        private SessionStore<K, Agg> store;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            store = (SessionStore<K, T>) context.getStateStore(storeName);
+            store = (SessionStore<K, Agg>) context.getStateStore(storeName);
         }
 
         @Override
-        public T get(final Windowed<K> key) {
-            try (final KeyValueIterator<Windowed<K>, T> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) {
+        public Agg get(final Windowed<K> key) {
+            try (final KeyValueIterator<Windowed<K>, Agg> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) {
                 if (!iter.hasNext()) {
                     return null;
                 }
-                final T value = iter.next().value;
+                final Agg value = iter.next().value;
                 if (iter.hasNext()) {
                     throw new ProcessorStateException(String.format("Iterator for key [%s] on session store has more than one value", key));
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 4be1880..40702f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -32,20 +32,20 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final String storeName;
     private final Windows<W> windows;
-    private final Initializer<T> initializer;
-    private final Aggregator<? super K, ? super V, T> aggregator;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
 
     private boolean sendOldValues = false;
 
     KStreamWindowAggregate(final Windows<W> windows,
                            final String storeName,
-                           final Initializer<T> initializer,
-                           final Aggregator<? super K, ? super V, T> aggregator) {
+                           final Initializer<Agg> initializer,
+                           final Aggregator<? super K, ? super V, Agg> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -64,8 +64,8 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
     private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
 
-        private WindowStore<K, T> windowStore;
-        private TupleForwarder<Windowed<K>, T> tupleForwarder;
+        private WindowStore<K, Agg> windowStore;
+        private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
 
@@ -77,7 +77,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
             metrics = (StreamsMetricsImpl) context.metrics();
 
-            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+            windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
 
@@ -104,13 +104,13 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
                 final Long windowStart = entry.getKey();
                 if (windowStart > expiryTime) {
-                    T oldAgg = windowStore.fetch(key, windowStart);
+                    Agg oldAgg = windowStore.fetch(key, windowStart);
 
                     if (oldAgg == null) {
                         oldAgg = initializer.apply();
                     }
 
-                    final T newAgg = aggregator.apply(key, value, oldAgg);
+                    final Agg newAgg = aggregator.apply(key, value, oldAgg);
 
                     // update the store with the new value
                     windowStore.put(key, newAgg, windowStart);
@@ -127,34 +127,34 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
     }
 
     @Override
-    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+    public KTableValueGetterSupplier<Windowed<K>, Agg> view() {
 
-        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+        return new KTableValueGetterSupplier<Windowed<K>, Agg>() {
 
-            public KTableValueGetter<Windowed<K>, T> get() {
+            public KTableValueGetter<Windowed<K>, Agg> get() {
                 return new KStreamWindowAggregateValueGetter();
             }
 
             @Override
             public String[] storeNames() {
-                return new String[]{storeName};
+                return new String[] {storeName};
             }
         };
     }
 
-    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+    private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
 
-        private WindowStore<K, T> windowStore;
+        private WindowStore<K, Agg> windowStore;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+            windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
         }
 
         @SuppressWarnings("unchecked")
         @Override
-        public T get(final Windowed<K> windowedKey) {
+        public Agg get(final Windowed<K> windowedKey) {
             final K key = windowedKey.key();
             final W window = (W) windowedKey.window();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 2d349a2..bbec96c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -31,15 +31,13 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatelessProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
 
-import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
@@ -130,34 +128,38 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         // only materialize if the state store is queryable
         final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
 
-        KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
-                predicate,
-                filterNot,
-                shouldMaterialize ? materializedInternal.storeName() : null);
+        final KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(
+            this,
+            predicate,
+            filterNot,
+            shouldMaterialize ? materializedInternal.storeName() : null
+        );
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
-        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
-                                                              processorParameters,
-                                                              materializedInternal,
-                                                              null);
+        final ProcessorParameters<K, V> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
+            new ProcessorParameters<>(processorSupplier, name)
+        );
 
-        addGraphNode(tableNode);
+        final StreamsGraphNode tableNode = new TableProcessorNode<>(
+            name,
+            processorParameters,
+            materializedInternal,
+            null
+        );
 
-        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+        builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-        if (shouldMaterialize) {
-            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), name);
-        }
 
-        return new KTableImpl<>(builder,
-                                name,
-                                processorSupplier,
-                                this.keySerde,
-                                this.valSerde,
-                                sourceNodes,
-                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
-                                shouldMaterialize,
-                                tableNode);
+        return new KTableImpl<>(
+            builder,
+            name,
+            processorSupplier,
+            this.keySerde,
+            this.valSerde,
+            sourceNodes,
+            shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+            shouldMaterialize,
+            tableNode
+        );
     }
 
     @Override
@@ -202,31 +204,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
 
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(
-                this,
-                mapper,
-                shouldMaterialize ? materializedInternal.storeName() : null);
+            this,
+            mapper,
+            shouldMaterialize ? materializedInternal.storeName() : null
+        );
 
         // leaving in calls to ITB until building topology with graph
-        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-        if (shouldMaterialize) {
-            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), name);
-        }
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
-        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
-                                                              processorParameters,
-                                                              materializedInternal,
-                                                              null);
+        final ProcessorParameters<K, VR> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
+            new ProcessorParameters<>(processorSupplier, name)
+        );
+        final StreamsGraphNode tableNode = new TableProcessorNode<>(
+            name,
+            processorParameters,
+            materializedInternal,
+            null
+        );
 
-        addGraphNode(tableNode);
+        builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-        return new KTableImpl<>(builder,
-                                name,
-                                processorSupplier,
-                                sourceNodes,
-                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
-                                shouldMaterialize,
-                                tableNode);
+        return new KTableImpl<>(
+            builder,
+            name,
+            processorSupplier,
+            sourceNodes,
+            shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+            shouldMaterialize,
+            tableNode
+        );
     }
 
     @Override
@@ -296,25 +301,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             transformerSupplier,
             shouldMaterialize ? materialized.storeName() : null);
 
-        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
-        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
-                                                              processorParameters,
-                                                              materialized,
-                                                              stateStoreNames);
-
-        addGraphNode(tableNode);
-
-        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+        final ProcessorParameters<K, VR> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
+            new ProcessorParameters<>(processorSupplier, name)
+        );
 
-        if (stateStoreNames.length > 0) {
-            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
-        }
+        final StreamsGraphNode tableNode = new TableProcessorNode<>(
+            name,
+            processorParameters,
+            materialized,
+            stateStoreNames
+        );
 
-        if (shouldMaterialize) {
-            builder.internalTopologyBuilder.addStateStore(
-                new KeyValueStoreMaterializer<>(materialized).materialize(),
-                name);
-        }
+        builder.addGraphNode(this.streamsGraphNode, tableNode);
 
         return new KTableImpl<>(
             builder,
@@ -328,28 +326,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KStream<K, V> toStream() {
-        String name = builder.newProcessorName(TOSTREAM_NAME);
+        final String name = builder.newProcessorName(TOSTREAM_NAME);
 
-        ProcessorSupplier kStreamMapValues = new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
-            @Override
-            public V apply(final K key, final Change<V> change) {
-                return change.newValue;
-            }
-        });
-        ProcessorParameters processorParameters = new ProcessorParameters<K, V>(kStreamMapValues, name);
-
-        final StatelessProcessorNode<K, V> toStreamNode = new StatelessProcessorNode<>(name,
-                                                                                       processorParameters,
-                                                                                       false);
+        final ProcessorSupplier<K, Change<V>> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
+        final ProcessorParameters<K, V> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
+            new ProcessorParameters<>(kStreamMapValues, name)
+        );
 
-        addGraphNode(toStreamNode);
+        final ProcessorGraphNode<K, V> toStreamNode = new ProcessorGraphNode<>(
+            name,
+            processorParameters,
+            false
+        );
 
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
-            @Override
-            public V apply(final K key, final Change<V> change) {
-                return change.newValue;
-            }
-        }), this.name);
+        builder.addGraphNode(this.streamsGraphNode, toStreamNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, false, toStreamNode);
     }
@@ -420,21 +410,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
         final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
-        KTable<K, VR> kTable = buildJoin((AbstractStream<K>) other,
-                                               joiner,
-                                               leftOuter,
-                                               rightOuter,
-                                               joinMergeName,
-                                               internalQueryableName,
-                                               materializedInternal);
-
-        if (materializedInternal != null) {
-            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
-                = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
-            builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName);
-        }
 
-        return kTable;
+        return buildJoin(
+            (AbstractStream<K>) other,
+            joiner,
+            leftOuter,
+            rightOuter,
+            joinMergeName,
+            internalQueryableName,
+            materializedInternal
+        );
     }
 
     @SuppressWarnings("unchecked")
@@ -473,9 +458,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-            new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.queryableStoreName, false, null),
-            new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
-                                     ((KTableImpl<K, ?, ?>) other).queryableStoreName, false, null),
+            new KTableImpl<K, V, R>(
+                builder,
+                joinThisName,
+                joinThis,
+                sourceNodes,
+                this.queryableStoreName,
+                false,
+                null
+            ),
+            new KTableImpl<K, V1, R>(
+                builder,
+                joinOtherName,
+                joinOther,
+                ((KTableImpl<K, ?, ?>) other).sourceNodes,
+                ((KTableImpl<K, ?, ?>) other).queryableStoreName,
+                false,
+                null
+            ),
             internalQueryableName
         );
 
@@ -487,37 +487,35 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
         kTableJoinNodeBuilder.withNodeName(joinMergeName);
 
-        ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
-        ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
-        ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
+        final ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
+        final ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
+        final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
 
         kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
             .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
             .withJoinThisProcessorParameters(joinThisProcessorParameters)
             .withJoinThisStoreNames(valueGetterSupplier().storeNames())
-            .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames());
-
-        KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
-        addGraphNode(kTableKTableJoinNode);
+            .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
+            .withOtherJoinSideNodeName(((KTableImpl) other).name)
+            .withThisJoinSideNodeName(name);
 
-        builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, this.name);
-        builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
-        builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
-        builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
+        final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
+        builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
 
-        return new KTableImpl<>(builder,
-                                joinMergeName,
-                                joinMerge,
-                                allSourceNodes,
-                                internalQueryableName,
-                                internalQueryableName != null,
-                                kTableKTableJoinNode);
+        return new KTableImpl<>(
+            builder,
+            joinMergeName,
+            joinMerge,
+            allSourceNodes,
+            internalQueryableName,
+            internalQueryableName != null,
+            kTableKTableJoinNode
+        );
     }
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
-        return this.groupBy(selector, Serialized.<K1, V1>with(null, null));
+        return this.groupBy(selector, Serialized.with(null, null));
     }
 
     @Override
@@ -525,34 +523,36 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                                   final Serialized<K1, V1> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
-        String selectName = builder.newProcessorName(SELECT_NAME);
+        final String selectName = builder.newProcessorName(SELECT_NAME);
 
-        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
-        ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
+        final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
-        builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
-        final StatelessProcessorNode<K1, V1> graphNode = new StatelessProcessorNode<>(selectName,
-                                                                                      processorParameters,
-                                                                                      false,
-                                                                                      Collections.<String>emptyList());
+        final ProcessorGraphNode<K1, V1> groupByMapNode = new ProcessorGraphNode<>(
+            selectName,
+            processorParameters,
+            false
+        );
 
-        addGraphNode(graphNode);
+        builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
         this.enableSendingOldValues();
         final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
-        return new KGroupedTableImpl<>(builder,
-                                       selectName,
-                                       this.name,
-                                       serializedInternal.keySerde(),
-                                       serializedInternal.valueSerde(),
-                                       graphNode);
+        return new KGroupedTableImpl<>(
+            builder,
+            selectName,
+            this.name,
+            serializedInternal.keySerde(),
+            serializedInternal.valueSerde(),
+            groupByMapNode
+        );
     }
 
     @SuppressWarnings("unchecked")
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
-            KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+            final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
             return new KTableSourceValueGetterSupplier<>(source.storeName);
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
@@ -565,7 +565,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     void enableSendingOldValues() {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
-                KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
+                final KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
@@ -580,4 +580,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return sendOldValues;
     }
 
+    /**
+     * We conflate V with Change<V> in many places. It might be nice to fix that eventually.
+     * For now, I'm just explicitly lying about the parameterized type.
+     */
+    @SuppressWarnings("unchecked")
+    private <VR> ProcessorParameters<K, VR> unsafeCastProcessorParametersToCompletelyDifferentType(final ProcessorParameters<K, Change<V>> kObjectProcessorParameters) {
+        return (ProcessorParameters<K, VR>) kObjectProcessorParameters;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 5361e48..2bc7326 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -66,11 +66,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return topicConfig;
     }
 
-    boolean cachingEnabled() {
+    public boolean cachingEnabled() {
         return cachingEnabled;
     }
 
-    boolean isQueryable() {
+    public boolean isQueryable() {
         return queriable;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index dacab56..54611f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -45,12 +45,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
-    private final Merger<K, Long> countMerger = new Merger<K, Long>() {
-        @Override
-        public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
-            return aggOne + aggTwo;
-        }
-    };
+    private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
 
     SessionWindowedKStreamImpl(final SessionWindows windows,
                                final InternalStreamsBuilder builder,
@@ -86,7 +81,6 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         return doCount(materialized);
     }
 
-    @SuppressWarnings("unchecked")
     private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
         final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
@@ -97,19 +91,24 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
             materializedInternal.withValueSerde(Serdes.Long());
         }
 
-        return (KTable<Windowed<K>, Long>) aggregateBuilder.build(
-            new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, countMerger),
+        return aggregateBuilder.build(
+            new KStreamSessionWindowAggregate<>(
+                windows, materializedInternal.storeName(),
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator,
+                countMerger
+            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable());
+            materializedInternal.isQueryable()
+        );
     }
 
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
-        return reduce(reducer, Materialized.<K, V, SessionStore<Bytes, byte[]>>with(keySerde, valSerde));
+        return reduce(reducer, Materialized.with(keySerde, valSerde));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
@@ -125,21 +124,27 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
             materializedInternal.withValueSerde(valSerde);
         }
 
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(
-                new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, reduceAggregator, mergerForAggregator(reduceAggregator)),
-                REDUCE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+        return aggregateBuilder.build(
+            new KStreamSessionWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                aggregateBuilder.reduceInitializer,
+                reduceAggregator,
+                mergerForAggregator(reduceAggregator)
+            ),
+            REDUCE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable()
+        );
     }
 
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                 final Aggregator<? super K, ? super V, T> aggregator,
                                                 final Merger<? super K, T> sessionMerger) {
-        return aggregate(initializer, aggregator, sessionMerger, Materialized.<K, T, SessionStore<Bytes, byte[]>>with(keySerde, null));
+        return aggregate(initializer, aggregator, sessionMerger, Materialized.with(keySerde, null));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator,
@@ -155,22 +160,33 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
-                new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
-                AGGREGATE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+        return aggregateBuilder.build(
+            new KStreamSessionWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                initializer,
+                aggregator,
+                sessionMerger
+            ),
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable()
+        );
     }
 
     private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
         SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentSessionStore(materialized.storeName(),
-                                                     windows.maintainMs());
+            supplier = Stores.persistentSessionStore(
+                materialized.storeName(),
+                windows.maintainMs()
+            );
         }
-        final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(supplier,
-                                                                                     materialized.keySerde(),
-                                                                                     materialized.valueSerde());
+        final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(
+            supplier,
+            materialized.keySerde(),
+            materialized.valueSerde()
+        );
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
@@ -185,23 +201,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
     }
 
     private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) {
-        return new Merger<K, V>() {
-            @Override
-            public V apply(final K aggKey, final V aggOne, final V aggTwo) {
-                return aggregator.apply(aggKey, aggTwo, aggOne);
-            }
-        };
+        return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, aggOne);
     }
 
     private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
-        return new Aggregator<K, V, V>() {
-            @Override
-            public V apply(final K aggKey, final V value, final V aggregate) {
-                if (aggregate == null) {
-                    return value;
-                }
-                return reducer.apply(aggregate, value);
-            }
-        };
+        return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 7d6d174..0daaf0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -81,7 +81,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         return doCount(materialized);
     }
 
-    @SuppressWarnings("unchecked")
     private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
         final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(builder, AGGREGATE_NAME);
@@ -93,20 +92,26 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             materializedInternal.withValueSerde(Serdes.Long());
         }
 
-        return (KTable<Windowed<K>, Long>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+        return aggregateBuilder.build(
+            new KStreamWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator
+            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable());
+            materializedInternal.isQueryable()
+        );
     }
 
 
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator) {
-        return aggregate(initializer, aggregator, Materialized.<K, VR, WindowStore<Bytes, byte[]>>with(keySerde, null));
+        return aggregate(initializer, aggregator, Materialized.with(keySerde, null));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator,
@@ -119,18 +124,23 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
-        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
-                                                                AGGREGATE_NAME,
-                                                                materialize(materializedInternal),
-                                                                materializedInternal.isQueryable());
+        return aggregateBuilder.build(
+            new KStreamWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                initializer,
+                aggregator
+            ),
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable());
     }
 
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
-        return reduce(reducer, Materialized.<K, V, WindowStore<Bytes, byte[]>>with(keySerde, valSerde));
+        return reduce(reducer, Materialized.with(keySerde, valSerde));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(reducer, "reducer can't be null");
@@ -146,10 +156,12 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             materializedInternal.withValueSerde(valSerde);
         }
 
-        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
-                                                               REDUCE_NAME,
-                                                               materialize(materializedInternal),
-                                                               materializedInternal.isQueryable());
+        return aggregateBuilder.build(
+            new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
+            REDUCE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable()
+        );
     }
 
     private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
@@ -163,9 +175,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                 windows.segmentInterval()
             );
         }
-        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
-                                                                                   materialized.keySerde(),
-                                                                                   materialized.valueSerde());
+        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(
+            supplier,
+            materialized.keySerde(),
+            materialized.valueSerde()
+        );
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 04a9ab2..8e1476a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -26,7 +26,7 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
 
     private final WindowedSerializer<K> serializer;
 
-    WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
+    public WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
         this.serializer = serializer;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
index 35805d3..fd1fcc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -75,4 +75,16 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode {
     String otherJoinSideNodeName() {
         return otherJoinSideNodeName;
     }
+
+    @Override
+    public String toString() {
+        return "BaseJoinProcessorNode{" +
+               "joinThisProcessorParameters=" + joinThisProcessorParameters +
+               ", joinOtherProcessorParameters=" + joinOtherProcessorParameters +
+               ", joinMergeProcessorParameters=" + joinMergeProcessorParameters +
+               ", valueJoiner=" + valueJoiner +
+               ", thisJoinSideNodeName='" + thisJoinSideNodeName + '\'' +
+               ", otherJoinSideNodeName='" + otherJoinSideNodeName + '\'' +
+               "} " + super.toString();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index 73a7f11..e7f8e56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -53,4 +53,15 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
 
     abstract Deserializer<V> getValueDeserializer();
 
+    @Override
+    public String toString() {
+        return "BaseRepartitionNode{" +
+               "keySerde=" + keySerde +
+               ", valueSerde=" + valueSerde +
+               ", sinkName='" + sinkName + '\'' +
+               ", sourceName='" + sourceName + '\'' +
+               ", repartitionTopic='" + repartitionTopic + '\'' +
+               ", processorParameters=" + processorParameters +
+               "} " + super.toString();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index 0a70a43..97fb69d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -25,61 +25,80 @@ import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
-public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionNode {
-
-
-    GroupedTableOperationRepartitionNode(final String nodeName,
-                                         final Serde<K> keySerde,
-                                         final Serde<V> valueSerde,
-                                         final String sinkName,
-                                         final String sourceName,
-                                         final String repartitionTopic,
-                                         final ProcessorParameters processorParameters) {
-
-        super(nodeName,
-              sourceName,
-              processorParameters,
-              keySerde,
-              valueSerde,
-              sinkName,
-              repartitionTopic
+public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionNode<K, V> {
+
+
+    private GroupedTableOperationRepartitionNode(final String nodeName,
+                                                 final Serde<K> keySerde,
+                                                 final Serde<V> valueSerde,
+                                                 final String sinkName,
+                                                 final String sourceName,
+                                                 final String repartitionTopic,
+                                                 final ProcessorParameters processorParameters) {
+
+        super(
+            nodeName,
+            sourceName,
+            processorParameters,
+            keySerde,
+            valueSerde,
+            sinkName,
+            repartitionTopic
         );
     }
 
     @Override
-    Serializer getValueSerializer() {
-        final Serializer<? extends V> valueSerializer = valueSerde == null ? null : valueSerde.serializer();
-        return new ChangedSerializer<>(valueSerializer);
+    Serializer<V> getValueSerializer() {
+        final Serializer<V> valueSerializer = valueSerde == null ? null : valueSerde.serializer();
+        return unsafeCastChangedToValueSerializer(valueSerializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Serializer<V> unsafeCastChangedToValueSerializer(final Serializer<V> valueSerializer) {
+        return (Serializer<V>) new ChangedSerializer<>(valueSerializer);
     }
 
     @Override
-    Deserializer getValueDeserializer() {
+    Deserializer<V> getValueDeserializer() {
         final Deserializer<? extends V> valueDeserializer = valueSerde == null ? null : valueSerde.deserializer();
-        return new ChangedDeserializer<>(valueDeserializer);
+        return unsafeCastChangedToValueDeserializer(valueDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Deserializer<V> unsafeCastChangedToValueDeserializer(final Deserializer<? extends V> valueDeserializer) {
+        return (Deserializer<V>) new ChangedDeserializer<>(valueDeserializer);
     }
 
+    @Override
+    public String toString() {
+        return "GroupedTableOperationRepartitionNode{} " + super.toString();
+    }
 
     @Override
-    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final Serializer<K> keySerializer = keySerde != null ? keySerde.serializer() : null;
         final Deserializer<K> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
 
 
         topologyBuilder.addInternalTopic(repartitionTopic);
 
-        topologyBuilder.addSink(sinkName,
-                                repartitionTopic,
-                                keySerializer,
-                                getValueSerializer(),
-                                null,
-                                parentNode().nodeName());
+        topologyBuilder.addSink(
+            sinkName,
+            repartitionTopic,
+            keySerializer,
+            getValueSerializer(),
+            null,
+            parentNodeNames()
+        );
 
-        topologyBuilder.addSource(null,
-                                  sourceName,
-                                  new FailOnInvalidTimestamp(),
-                                  keyDeserializer,
-                                  getValueDeserializer(),
-                                  repartitionTopic);
+        topologyBuilder.addSource(
+            null,
+            sourceName,
+            new FailOnInvalidTimestamp(),
+            keyDeserializer,
+            getValueDeserializer(),
+            repartitionTopic
+        );
 
     }
 
@@ -101,49 +120,50 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
         private GroupedTableOperationRepartitionNodeBuilder() {
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withKeySerde(Serde<K> keySerde) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withKeySerde(final Serde<K> keySerde) {
             this.keySerde = keySerde;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withValueSerde(Serde<V> valueSerde) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withValueSerde(final Serde<V> valueSerde) {
             this.valueSerde = valueSerde;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSinkName(String sinkName) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSinkName(final String sinkName) {
             this.sinkName = sinkName;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withNodeName(String nodeName) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withNodeName(final String nodeName) {
             this.nodeName = nodeName;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSourceName(String sourceName) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSourceName(final String sourceName) {
             this.sourceName = sourceName;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withRepartitionTopic(String repartitionTopic) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withRepartitionTopic(final String repartitionTopic) {
             this.repartitionTopic = repartitionTopic;
             return this;
         }
 
-        public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(ProcessorParameters processorParameters) {
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
             this.processorParameters = processorParameters;
             return this;
         }
 
         public GroupedTableOperationRepartitionNode<K, V> build() {
-            return new GroupedTableOperationRepartitionNode(nodeName,
-                                                            keySerde,
-                                                            valueSerde,
-                                                            sinkName,
-                                                            sourceName,
-                                                            repartitionTopic,
-                                                            processorParameters
+            return new GroupedTableOperationRepartitionNode<>(
+                nodeName,
+                keySerde,
+                valueSerde,
+                sinkName,
+                sourceName,
+                repartitionTopic,
+                processorParameters
             );
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 5e3d8d9..ddebeac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -18,12 +18,16 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Arrays;
 
 /**
- * Too much specific information to generalize so the
- * KTable-KTable join requires a specific node.
+ * Too much specific information to generalize so the KTable-KTable join requires a specific node.
  */
 public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
 
@@ -57,7 +61,42 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
 
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+        final String thisProcessorName = thisProcessorParameters().processorName();
+        final String otherProcessorName = otherProcessorParameters().processorName();
+        final String mergeProcessorName = mergeProcessorParameters().processorName();
+
+        topologyBuilder.addProcessor(thisProcessorName,
+                                     thisProcessorParameters().processorSupplier(),
+                                     thisJoinSideNodeName());
+
+        topologyBuilder.addProcessor(otherProcessorName,
+                                     otherProcessorParameters().processorSupplier(),
+                                     otherJoinSideNodeName());
+
+        topologyBuilder.addProcessor(mergeProcessorName,
+                                     mergeProcessorParameters().processorSupplier(),
+                                     thisProcessorName,
+                                     otherProcessorName);
+
+        topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
+                                                       joinOtherStoreNames);
+        topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
+                                                       joinThisStoreNames);
+
+        if (materializedInternal != null) {
+            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
+                = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+            topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KTableKTableJoinNode{" +
+               "joinThisStoreNames=" + Arrays.toString(joinThisStoreNames) +
+               ", joinOtherStoreNames=" + Arrays.toString(joinOtherStoreNames) +
+               ", materializedInternal=" + materializedInternal +
+               "} " + super.toString();
     }
 
     public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
@@ -80,12 +119,12 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         private KTableKTableJoinNodeBuilder() {
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisStoreNames(final String[] joinThisStoreNames) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(final String[] joinThisStoreNames) {
             this.joinThisStoreNames = joinThisStoreNames;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
             this.joinThisProcessorParameters = joinThisProcessorParameters;
             return this;
         }
@@ -95,32 +134,32 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
             this.joinOtherStoreNames = joinOtherStoreNames;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessorParameters) {
             this.joinOtherProcessorParameters = joinOtherProcessorParameters;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
             this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
             this.valueJoiner = valueJoiner;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withThisJoinSide(final String thisJoinSide) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(final String thisJoinSide) {
             this.thisJoinSide = thisJoinSide;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withOtherJoinSide(final String otherJoinSide) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(final String otherJoinSide) {
             this.otherJoinSide = otherJoinSide;
             return this;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index 8b1ce6d..7198df0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -33,13 +34,15 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
                                final String sinkName,
                                final String repartitionTopic) {
 
-        super(nodeName,
-              sourceName,
-              processorParameters,
-              keySerde,
-              valueSerde,
-              sinkName,
-              repartitionTopic);
+        super(
+            nodeName,
+            sourceName,
+            processorParameters,
+            keySerde,
+            valueSerde,
+            sinkName,
+            repartitionTopic
+        );
 
     }
 
@@ -53,33 +56,41 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
         return  valueSerde != null ? valueSerde.deserializer() : null;
     }
 
+    @Override
+    public String toString() {
+        return "OptimizableRepartitionNode{ " + super.toString() + " }";
+    }
 
     @Override
-    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         final Serializer<K> keySerializer = keySerde != null ? keySerde.serializer() : null;
         final Deserializer<K> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
 
-
-
         topologyBuilder.addInternalTopic(repartitionTopic);
 
-        topologyBuilder.addProcessor(processorParameters.processorName(),
-                                     processorParameters.processorSupplier(),
-                                     parentNode().nodeName());
-
-        topologyBuilder.addSink(sinkName,
-                                repartitionTopic,
-                                keySerializer,
-                                getValueSerializer(),
-                                null,
-                                processorParameters.processorName());
-
-        topologyBuilder.addSource(null,
-                                  sourceName,
-                                  new FailOnInvalidTimestamp(),
-                                  keyDeserializer,
-                                  getValueDeserializer(),
-                                  repartitionTopic);
+        topologyBuilder.addProcessor(
+            processorParameters.processorName(),
+            processorParameters.processorSupplier(),
+            parentNodeNames()
+        );
+
+        topologyBuilder.addSink(
+            sinkName,
+            repartitionTopic,
+            keySerializer,
+            getValueSerializer(),
+            null,
+            processorParameters.processorName()
+        );
+
+        topologyBuilder.addSource(
+            null,
+            sourceName,
+            new FailOnInvalidTimestamp(),
+            keyDeserializer,
+            getValueDeserializer(),
+            repartitionTopic
+        );
 
     }
 
@@ -139,13 +150,14 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
 
         public OptimizableRepartitionNode<K, V> build() {
 
-            return new OptimizableRepartitionNode<>(nodeName,
-                                                    sourceName,
-                                                    processorParameters,
-                                                    keySerde,
-                                                    valueSerde,
-                                                    sinkName,
-                                                    repartitionTopic
+            return new OptimizableRepartitionNode<>(
+                nodeName,
+                sourceName,
+                processorParameters,
+                keySerde,
+                valueSerde,
+                sinkName,
+                repartitionTopic
             );
 
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
similarity index 51%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index b985f92..658f55e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -19,57 +19,47 @@ package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Used to represent any type of stateless operation:
  *
  * map, mapValues, flatMap, flatMapValues, filter, filterNot, branch
- *
  */
-public class StatelessProcessorNode<K, V> extends StreamsGraphNode {
+public class ProcessorGraphNode<K, V> extends StreamsGraphNode {
 
     private final ProcessorParameters<K, V> processorParameters;
 
-    // some processors need to register multiple parent names with
-    // the InternalTopologyBuilder KStream#merge for example.
-    // There is only one parent graph node but the name of each KStream merged needs
-    // to get registered with InternalStreamsBuilder
-
-    private List<String> multipleParentNames = new ArrayList<>();
-
-
-    public StatelessProcessorNode(final String nodeName,
-                           final ProcessorParameters processorParameters,
-                           final boolean repartitionRequired) {
+    public ProcessorGraphNode(final String nodeName,
+                              final ProcessorParameters<K, V> processorParameters,
+                              final boolean repartitionRequired) {
 
-        super(nodeName,
-              repartitionRequired);
+        super(nodeName, repartitionRequired);
 
         this.processorParameters = processorParameters;
     }
 
-    public StatelessProcessorNode(final String nodeName,
-                           final ProcessorParameters processorParameters,
-                           final boolean repartitionRequired,
-                           final List<String> multipleParentNames) {
-
-        this(nodeName, processorParameters, repartitionRequired);
-
-        this.multipleParentNames = multipleParentNames;
+    public ProcessorGraphNode(final String nodeName,
+                              final ProcessorParameters<K, V> processorParameters) {
+        this(
+            nodeName,
+            processorParameters,
+            false
+        );
     }
 
-    ProcessorParameters<K, V> processorSupplier() {
+    public ProcessorParameters processorParameters() {
         return processorParameters;
     }
 
-    List<String> multipleParentNames() {
-        return new ArrayList<>(multipleParentNames);
+    @Override
+    public String toString() {
+        return "ProcessorNode{" +
+               "processorParameters=" + processorParameters +
+               "} " + super.toString();
     }
 
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+
+        topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), parentNodeNames());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index bca7571..eb3d9f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -31,7 +31,9 @@ public class ProcessorParameters<K, V> {
     private final ProcessorSupplier<K, V> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final String processorName) {
+    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+                               final String processorName) {
+
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
@@ -43,4 +45,12 @@ public class ProcessorParameters<K, V> {
     public String processorName() {
         return processorName;
     }
+
+    @Override
+    public String toString() {
+        return "ProcessorParameters{" +
+               "processor class=" + processorSupplier.get().getClass() +
+               ", processor name='" + processorName + '\'' +
+               '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 0204d51..c2b445e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -17,43 +17,56 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Arrays;
 
-public class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
+public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
 
     private final String[] storeNames;
-    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
-    private final String maybeRepartitionedSourceName;
+    private final StoreBuilder<? extends StateStore> storeBuilder;
 
 
     public StatefulProcessorNode(final String nodeName,
                                  final ProcessorParameters processorParameters,
                                  final String[] storeNames,
-                                 final String maybeRepartitionedSourceName,
-                                 final StoreBuilder<KeyValueStore<K, V>> materializedKTableStoreBuilder,
+                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
                                  final boolean repartitionRequired) {
         super(nodeName,
-              processorParameters,
-              repartitionRequired);
+            processorParameters,
+            repartitionRequired);
 
         this.storeNames = storeNames;
         this.storeBuilder = materializedKTableStoreBuilder;
-        this.maybeRepartitionedSourceName = maybeRepartitionedSourceName;
     }
 
-
-    String[] storeNames() {
-        return Arrays.copyOf(storeNames, storeNames.length);
+    @Override
+    public String toString() {
+        return "StatefulProcessorNode{" +
+               "storeNames=" + Arrays.toString(storeNames) +
+               ", storeBuilder=" + storeBuilder +
+               "} " + super.toString();
     }
 
-
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+
+        final String processorName = processorParameters().processorName();
+        final ProcessorSupplier processorSupplier = processorParameters().processorSupplier();
+
+        topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames());
+
+        if (storeNames != null && storeNames.length > 0) {
+            topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
+        }
+
+        if (storeBuilder != null) {
+            topologyBuilder.addStateStore(storeBuilder, processorName);
+        }
     }
 
     public static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
@@ -65,9 +78,8 @@ public class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
         private ProcessorParameters processorSupplier;
         private String nodeName;
         private boolean repartitionRequired;
-        private String maybeRepartitionedSourceName;
         private String[] storeNames;
-        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        private StoreBuilder<? extends StateStore> storeBuilder;
 
         private StatefulProcessorNodeBuilder() {
         }
@@ -92,23 +104,19 @@ public class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
             return this;
         }
 
-        public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+        public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<? extends StateStore> storeBuilder) {
             this.storeBuilder = storeBuilder;
             return this;
         }
 
-        public StatefulProcessorNodeBuilder<K, V> withMaybeRepartitionedSourceName(String maybeRepartitionedSourceName) {
-            this.maybeRepartitionedSourceName = maybeRepartitionedSourceName;
-            return this;
-        }
-
         public StatefulProcessorNode<K, V> build() {
-            return new StatefulProcessorNode<>(nodeName,
-                                               processorSupplier,
-                                               storeNames,
-                                               maybeRepartitionedSourceName,
-                                               storeBuilder,
-                                               repartitionRequired);
+            return new StatefulProcessorNode<>(
+                nodeName,
+                processorSupplier,
+                storeNames,
+                storeBuilder,
+                repartitionRequired
+            );
 
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 36dbefe..0f8ac2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -17,13 +17,13 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.ProducedInternal;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 
 public class StreamSinkNode<K, V> extends StreamsGraphNode {
 
@@ -31,8 +31,8 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
     private final ProducedInternal<K, V> producedInternal;
 
     public StreamSinkNode(final String nodeName,
-                   final TopicNameExtractor<K, V> topicNameExtractor,
-                   final ProducedInternal<K, V> producedInternal) {
+                          final TopicNameExtractor<K, V> topicNameExtractor,
+                          final ProducedInternal<K, V> producedInternal) {
 
         super(nodeName,
               false);
@@ -41,37 +41,28 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
         this.producedInternal = producedInternal;
     }
 
-    String topic() {
-        return topicNameExtractor instanceof StaticTopicNameExtractor ? ((StaticTopicNameExtractor) topicNameExtractor).topicName : null;
-    }
-
-    TopicNameExtractor<K, V> topicNameExtractor() {
-        return topicNameExtractor;
-    }
-
-    Serde<K> keySerde() {
-        return producedInternal.keySerde();
-    }
-
-    Serializer<K> keySerializer() {
-        return producedInternal.keySerde() != null ? producedInternal.keySerde().serializer() : null;
-    }
-
-    Serde<V> valueSerde() {
-        return producedInternal.valueSerde();
-    }
 
-    Serializer<V> valueSerializer() {
-        return producedInternal.valueSerde() != null ? producedInternal.valueSerde().serializer() : null;
-    }
-
-    StreamPartitioner<? super K, ? super V> streamPartitioner() {
-        return producedInternal.streamPartitioner();
+    @Override
+    public String toString() {
+        return "StreamSinkNode{" +
+               "topicNameExtractor=" + topicNameExtractor +
+               ", producedInternal=" + producedInternal +
+               "} " + super.toString();
     }
 
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+        final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer();
+        final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer();
+        final StreamPartitioner<? super K, ? super V> partitioner = producedInternal.streamPartitioner();
+        final String[] parentNames = parentNodeNames();
+
+        if (partitioner == null && keySerializer instanceof WindowedSerializer) {
+            final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer);
+            topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames);
+        } else {
+            topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner,  parentNames);
+        }
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index 6257ee8..29b6ecf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -17,9 +17,11 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.regex.Pattern;
 
@@ -31,8 +33,8 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode {
 
 
     public StreamSourceNode(final String nodeName,
-                     final Collection<String> topicNames,
-                     final ConsumedInternal<K, V> consumedInternal) {
+                            final Collection<String> topicNames,
+                            final ConsumedInternal<K, V> consumedInternal) {
         super(nodeName,
               false);
 
@@ -52,20 +54,53 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode {
     }
 
     public Collection<String> getTopicNames() {
-        return topicNames;
+        return new ArrayList<>(topicNames);
     }
 
-    public Pattern getTopicPattern() {
+    public Pattern topicPattern() {
         return topicPattern;
     }
 
-    public ConsumedInternal<K, V> getConsumedInternal() {
+    public ConsumedInternal<K, V> consumedInternal() {
         return consumedInternal;
     }
 
+    public Serde<K> keySerde() {
+        return consumedInternal.keySerde();
+    }
+
+    public Serde<V> valueSerde() {
+        return consumedInternal.valueSerde();
+    }
+
+    @Override
+    public String toString() {
+        return "StreamSourceNode{" +
+               "topicNames=" + topicNames +
+               ", topicPattern=" + topicPattern +
+               ", consumedInternal=" + consumedInternal +
+               "} " + super.toString();
+    }
+
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+
+        if (topicPattern != null) {
+            topologyBuilder.addSource(consumedInternal.offsetResetPolicy(),
+                                      nodeName(),
+                                      consumedInternal.timestampExtractor(),
+                                      consumedInternal.keyDeserializer(),
+                                      consumedInternal.valueDeserializer(),
+                                      topicPattern);
+        } else {
+            topologyBuilder.addSource(consumedInternal.offsetResetPolicy(),
+                                      nodeName(),
+                                      consumedInternal.timestampExtractor(),
+                                      consumedInternal.keyDeserializer(),
+                                      consumedInternal.valueDeserializer(),
+                                      topicNames.toArray(new String[topicNames.size()]));
+
+        }
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index ace2164..46cf3e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -24,8 +24,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
 
 /**
- * Too much information to generalize, so Stream-Stream joins are
- * represented by a specific node.
+ * Too much information to generalize, so Stream-Stream joins are represented by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
 
@@ -45,48 +44,49 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                          final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
                          final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
                          final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
-                         final Joined<K, V1, V2> joined,
-                         final String leftHandSideStreamName,
-                         final String otherStreamName) {
+                         final Joined<K, V1, V2> joined) {
 
         super(nodeName,
               valueJoiner,
               joinThisProcessorParameters,
               joinOtherProcessParameters,
               joinMergeProcessorParameters,
-              leftHandSideStreamName,
-              otherStreamName);
+              null,
+              null);
 
-        this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
-        this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
         this.thisWindowStoreBuilder = thisWindowStoreBuilder;
         this.otherWindowStoreBuilder = otherWindowStoreBuilder;
         this.joined = joined;
-    }
-
-    ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters() {
-        return thisWindowedStreamProcessorParameters;
-    }
-
-    ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters() {
-        return otherWindowedStreamProcessorParameters;
-    }
+        this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
+        this.otherWindowedStreamProcessorParameters =  otherWindowedStreamProcessorParameters;
 
-    StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder() {
-        return thisWindowStoreBuilder;
     }
 
-    StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder() {
-        return otherWindowStoreBuilder;
-    }
 
-    Joined<K, V1, V2> joined() {
-        return joined;
+    @Override
+    public String toString() {
+        return "StreamStreamJoinNode{" +
+               "thisWindowedStreamProcessorParameters=" + thisWindowedStreamProcessorParameters +
+               ", otherWindowedStreamProcessorParameters=" + otherWindowedStreamProcessorParameters +
+               ", thisWindowStoreBuilder=" + thisWindowStoreBuilder +
+               ", otherWindowStoreBuilder=" + otherWindowStoreBuilder +
+               ", joined=" + joined +
+               "} " + super.toString();
     }
 
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+
+        final String thisProcessorName = thisProcessorParameters().processorName();
+        final String otherProcessorName = otherProcessorParameters().processorName();
+        final String thisWindowedStreamProcessorName = thisWindowedStreamProcessorParameters.processorName();
+        final String otherWindowedStreamProcessorName = otherWindowedStreamProcessorParameters.processorName();
+
+        topologyBuilder.addProcessor(thisProcessorName, thisProcessorParameters().processorSupplier(), thisWindowedStreamProcessorName);
+        topologyBuilder.addProcessor(otherProcessorName, otherProcessorParameters().processorSupplier(), otherWindowedStreamProcessorName);
+        topologyBuilder.addProcessor(mergeProcessorParameters().processorName(), mergeProcessorParameters().processorSupplier(), thisProcessorName, otherProcessorName);
+        topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
+        topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
     }
 
     public static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
@@ -105,8 +105,6 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         private StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
         private StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
         private Joined<K, V1, V2> joined;
-        private String leftHandSideStreamName;
-        private String otherStreamName;
 
 
         private StreamStreamJoinNodeBuilder() {
@@ -123,11 +121,6 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters) {
-            this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
-            return this;
-        }
-
         public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
             this.nodeName = nodeName;
             return this;
@@ -138,23 +131,19 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowedStreamProcessorParameters(final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters) {
-            this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
-            return this;
-        }
-
         public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
             this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withLeftHandSideStreamName(final String leftHandSideStreamName) {
-            this.leftHandSideStreamName = leftHandSideStreamName;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters) {
+            this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherStreamName(final String otherStreamName) {
-            this.otherStreamName = otherStreamName;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowedStreamProcessorParameters(
+            final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters) {
+            this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
             return this;
         }
 
@@ -184,9 +173,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                                               otherWindowedStreamProcessorParameters,
                                               thisWindowStoreBuilder,
                                               otherWindowStoreBuilder,
-                                              joined,
-                                              leftHandSideStreamName,
-                                              otherStreamName);
+                                              joined);
 
 
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index 64ae441..1389156 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.Arrays;
@@ -29,37 +30,42 @@ public class StreamTableJoinNode<K, V> extends StreamsGraphNode {
 
     private final String[] storeNames;
     private final ProcessorParameters<K, V> processorParameters;
-    private boolean isGlobalKTableJoin;
+    private final String otherJoinSideNodeName;
 
     public StreamTableJoinNode(final String nodeName,
-                        final ProcessorParameters<K, V> processorParameters,
-                        final String[] storeNames) {
+                               final ProcessorParameters<K, V> processorParameters,
+                               final String[] storeNames,
+                               final String otherJoinSideNodeName) {
         super(nodeName,
               false);
 
         // in the case of Stream-Table join the state stores associated with the KTable
         this.storeNames = storeNames;
         this.processorParameters = processorParameters;
+        this.otherJoinSideNodeName = otherJoinSideNodeName;
     }
 
-    String[] storeNames() {
-        return Arrays.copyOf(storeNames, storeNames.length);
+    @Override
+    public String toString() {
+        return "StreamTableJoinNode{" +
+               "storeNames=" + Arrays.toString(storeNames) +
+               ", processorParameters=" + processorParameters +
+               ", otherJoinSideNodeName='" + otherJoinSideNodeName + '\'' +
+               "} " + super.toString();
     }
 
-    ProcessorParameters<K, V> processorParameters() {
-        return processorParameters;
-    }
+    @Override
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        final String processorName = processorParameters.processorName();
+        final ProcessorSupplier processorSupplier = processorParameters.processorSupplier();
 
-    public void setGlobalKTableJoin(boolean globalKTableJoin) {
-        isGlobalKTableJoin = globalKTableJoin;
-    }
+        // Stream - Table join (Global or KTable)
+        topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames());
 
-    boolean isGlobalKTableJoin() {
-        return isGlobalKTableJoin;
-    }
+        // Steam - KTable join only
+        if (otherJoinSideNodeName != null) {
+            topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
+        }
 
-    @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
index 23b6c34..4604807 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
@@ -17,34 +17,53 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 
 public abstract class StreamsGraphNode {
 
-    private StreamsGraphNode parentNode;
     private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>();
+    private final Collection<StreamsGraphNode> parentNodes = new LinkedHashSet<>();
     private final String nodeName;
     private boolean repartitionRequired;
     private boolean keyChangingOperation;
     private Integer id;
-    private InternalStreamsBuilder internalStreamsBuilder;
+    private boolean hasWrittenToTopology = false;
 
     public StreamsGraphNode(final String nodeName,
-                     final boolean repartitionRequired) {
+                            final boolean repartitionRequired) {
         this.nodeName = nodeName;
         this.repartitionRequired = repartitionRequired;
     }
 
-    public StreamsGraphNode parentNode() {
-        return parentNode;
+    public Collection<StreamsGraphNode> parentNodes() {
+        return parentNodes;
+    }
+
+    public String[] parentNodeNames() {
+        final String[] parentNames = new String[parentNodes.size()];
+        int index = 0;
+        for (final StreamsGraphNode parentNode : parentNodes) {
+            parentNames[index++] = parentNode.nodeName();
+        }
+        return parentNames;
     }
 
-    public void setParentNode(final StreamsGraphNode parentNode) {
-        this.parentNode = parentNode;
+    public boolean allParentsWrittenToTopology() {
+        for (final StreamsGraphNode parentNode : parentNodes) {
+            if (!parentNode.hasWrittenToTopology()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void addParentNode(final StreamsGraphNode parentNode) {
+        parentNodes.add(parentNode);
     }
 
     public Collection<StreamsGraphNode> children() {
@@ -53,7 +72,7 @@ public abstract class StreamsGraphNode {
 
     public void addChildNode(final StreamsGraphNode childNode) {
         this.childNodes.add(childNode);
-        childNode.setParentNode(this);
+        childNode.addParentNode(this);
     }
 
     public String nodeName() {
@@ -64,10 +83,6 @@ public abstract class StreamsGraphNode {
         return repartitionRequired;
     }
 
-    public void setRepartitionRequired(boolean repartitionRequired) {
-        this.repartitionRequired = repartitionRequired;
-    }
-
     public boolean isKeyChangingOperation() {
         return keyChangingOperation;
     }
@@ -84,14 +99,22 @@ public abstract class StreamsGraphNode {
         return this.id;
     }
 
-    public void setInternalStreamsBuilder(final InternalStreamsBuilder internalStreamsBuilder) {
-        this.internalStreamsBuilder = internalStreamsBuilder;
-    }
+    public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
 
-    public InternalStreamsBuilder internalStreamsBuilder() {
-        return internalStreamsBuilder;
+    public boolean hasWrittenToTopology() {
+        return hasWrittenToTopology;
     }
 
-    public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
+    public void setHasWrittenToTopology(boolean hasWrittenToTopology) {
+        this.hasWrittenToTopology = hasWrittenToTopology;
+    }
 
+    @Override
+    public String toString() {
+        String[] parentNames = parentNodeNames();
+        return "StreamsGraphNode{" +
+               "nodeName='" + nodeName + '\'' +
+               ", id=" + id +
+               " parentNodes=" + Arrays.toString(parentNames) + '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index 734b505..1428123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -17,9 +17,14 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Arrays;
 
 public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraphNode {
 
@@ -39,9 +44,28 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
         this.storeNames = storeNames != null ? storeNames : new String[]{};
     }
 
+    @Override
+    public String toString() {
+        return "TableProcessorNode{" +
+               "materializedInternal=" + materializedInternal +
+               ", processorParameters=" + processorParameters +
+               ", storeNames=" + Arrays.toString(storeNames) +
+               "} " + super.toString();
+    }
 
     @Override
-    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
+        final String processorName = processorParameters.processorName();
+
+        topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());
+
+        if (storeNames.length > 0) {
+            topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
+        }
 
+        if (shouldMaterialize) {
+            topologyBuilder.addStateStore(new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(), processorName);
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index d194a95..41daf9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -28,9 +29,9 @@ import java.util.Collections;
  * Used to represent either a KTable source or a GlobalKTable source. A boolean flag is used to indicate if this represents a GlobalKTable a {@link
  * org.apache.kafka.streams.kstream.GlobalKTable}
  */
-public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
+public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNode<K, V> {
 
-    private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private StoreBuilder<S> storeBuilder;
     private final ProcessorParameters<K, V> processorParameters;
     private final String sourceName;
     private final boolean isGlobalKTable;
@@ -39,7 +40,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
                     final String sourceName,
                     final String topic,
                     final ConsumedInternal<K, V> consumedInternal,
-                    final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+                    final StoreBuilder<S> storeBuilder,
                     final ProcessorParameters<K, V> processorParameters,
                     final boolean isGlobalKTable) {
 
@@ -53,80 +54,102 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
         this.storeBuilder = storeBuilder;
     }
 
-    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
-        return storeBuilder;
-    }
-
-    ProcessorParameters<K, V> processorParameters() {
-        return processorParameters;
-    }
-
-    String sourceName() {
-        return sourceName;
+    public boolean isGlobalKTable() {
+        return isGlobalKTable;
     }
 
-    boolean isGlobalKTable() {
-        return isGlobalKTable;
+    @Override
+    public String toString() {
+        return "TableSourceNode{" +
+               "storeBuilder=" + storeBuilder +
+               ", processorParameters=" + processorParameters +
+               ", sourceName='" + sourceName + '\'' +
+               ", isGlobalKTable=" + isGlobalKTable +
+               "} " + super.toString();
     }
 
-    public static <K, V> TableSourceNodeBuilder<K, V> tableSourceNodeBuilder() {
+    public static <K, V, S extends StateStore> TableSourceNodeBuilder<K, V, S> tableSourceNodeBuilder() {
         return new TableSourceNodeBuilder<>();
     }
 
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
+        final String topicName = getTopicNames().iterator().next();
+
+        if (isGlobalKTable) {
+            topologyBuilder.addGlobalStore((StoreBuilder<KeyValueStore>) storeBuilder,
+                                           sourceName,
+                                           consumedInternal().timestampExtractor(),
+                                           consumedInternal().keyDeserializer(),
+                                           consumedInternal().valueDeserializer(),
+                                           topicName,
+                                           processorParameters.processorName(),
+                                           processorParameters.processorSupplier());
+        } else {
+            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
+                                      sourceName,
+                                      consumedInternal().timestampExtractor(),
+                                      consumedInternal().keyDeserializer(),
+                                      consumedInternal().valueDeserializer(),
+                                      topicName);
+
+            topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);
+
+            topologyBuilder.addStateStore(storeBuilder, nodeName());
+            topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);
+        }
+
     }
 
-    public static final class TableSourceNodeBuilder<K, V> {
+    public static final class TableSourceNodeBuilder<K, V, S extends StateStore> {
 
         private String nodeName;
         private String sourceName;
         private String topic;
         private ConsumedInternal<K, V> consumedInternal;
-        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        private StoreBuilder<S> storeBuilder;
         private ProcessorParameters<K, V> processorParameters;
         private boolean isGlobalKTable = false;
 
         private TableSourceNodeBuilder() {
         }
 
-        public TableSourceNodeBuilder<K, V> withSourceName(final String sourceName) {
+        public TableSourceNodeBuilder<K, V, S> withSourceName(final String sourceName) {
             this.sourceName = sourceName;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withTopic(final String topic) {
+        public TableSourceNodeBuilder<K, V, S> withTopic(final String topic) {
             this.topic = topic;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+        public TableSourceNodeBuilder<K, V, S> withStoreBuilder(final StoreBuilder<S> storeBuilder) {
             this.storeBuilder = storeBuilder;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
+        public TableSourceNodeBuilder<K, V, S> withConsumedInternal(final ConsumedInternal consumedInternal) {
             this.consumedInternal = consumedInternal;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
+        public TableSourceNodeBuilder<K, V, S> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorParameters = processorParameters;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
+        public TableSourceNodeBuilder<K, V, S> withNodeName(final String nodeName) {
             this.nodeName = nodeName;
             return this;
         }
 
-        public TableSourceNodeBuilder<K, V> isGlobalKTable(final boolean isGlobaKTable) {
+        public TableSourceNodeBuilder<K, V, S> isGlobalKTable(final boolean isGlobaKTable) {
             this.isGlobalKTable = isGlobaKTable;
             return this;
         }
 
-        public TableSourceNode<K, V> build() {
+        public TableSourceNode<K, V, S> build() {
             return new TableSourceNode<>(nodeName,
                                          sourceName,
                                          topic,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index c644f9b..0ecae5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -475,7 +475,7 @@ public class InternalTopologyBuilder {
                 throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
+                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
index b12e94e..ee645b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -28,4 +28,4 @@ import org.apache.kafka.common.utils.Bytes;
  */
 public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
 
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 3b8c9bd..56e6a6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -59,6 +59,7 @@ public class StreamsBuilderTest {
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate());
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -72,6 +73,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic")
                 .filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -84,6 +86,7 @@ public class StreamsBuilderTest {
     public void shouldAllowJoinUnmaterializedMapValuedKTable() {
         final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper());
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -97,6 +100,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic")
                 .mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -110,6 +114,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
         builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -122,7 +127,8 @@ public class StreamsBuilderTest {
     public void shouldAllowJoinMaterializedJoinedKTable() {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
-        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER);
+        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")), MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -135,6 +141,7 @@ public class StreamsBuilderTest {
     public void shouldAllowJoinMaterializedSourceKTable() {
         final KTable<Bytes, String> table = builder.table("table-topic");
         builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
+        builder.build();
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build();
 
@@ -312,10 +319,12 @@ public class StreamsBuilderTest {
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
         builder.stream(Collections.<String>emptyList());
+        builder.build();
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Arrays.<String>asList(null, null));
+        builder.build();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index a24931e..44b42cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.integration;
 
 
-import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -26,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -35,6 +33,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.IntegrationTest;
@@ -57,6 +56,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import kafka.utils.MockTime;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -252,6 +253,7 @@ public class FineGrainedAutoResetIntegrationTest {
 
         try {
             builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.LATEST));
+            builder.build();
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
             // do nothing
@@ -265,6 +267,7 @@ public class FineGrainedAutoResetIntegrationTest {
         builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
         try {
             builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), Consumed.with(Topology.AutoOffsetReset.LATEST));
+            builder.build();
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
             // do nothing
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index f30d9d8..b5aebf0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -19,13 +19,15 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -102,8 +104,11 @@ public class AbstractStreamTest {
 
         KStream<K, V> randomFilter() {
             String name = builder.newProcessorName("RANDOM-FILTER-");
-            builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
-            return new KStreamImpl<>(builder, name, sourceNodes, false, null);
+            ProcessorGraphNode processorNode = new ProcessorGraphNode(name,
+                                                                      new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
+                                                                      false);
+            builder.addGraphNode(this.streamsGraphNode, processorNode);
+            return new KStreamImpl<>(builder, name, sourceNodes, false, processorNode);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index ef3fcd6..269c2f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -119,6 +119,7 @@ public class InternalStreamsBuilderTest {
 
         final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
         merged.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-table"));
+        builder.buildAndOptimizeTopology();
         final Map<String, List<String>> actual = builder.internalTopologyBuilder.stateStoreNameToSourceTopics();
         assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
     }
@@ -130,6 +131,7 @@ public class InternalStreamsBuilderTest {
         materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
         final KTable table1 = builder.table("topic2", consumed, materializedInternal);
 
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
         assertEquals(1, topology.stateStores().size());
@@ -171,6 +173,7 @@ public class InternalStreamsBuilderTest {
                             consumed,
             materializedInternal);
 
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
 
@@ -202,6 +205,8 @@ public class InternalStreamsBuilderTest {
             materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
             builder.globalTable("table2", consumed, materializedInternal);
         }
+
+        builder.buildAndOptimizeTopology();
         doBuildGlobalTopologyWithAllGlobalTables();
     }
 
@@ -255,10 +260,11 @@ public class InternalStreamsBuilderTest {
             new MaterializedInternal<>(Materialized.as("table-store"));
         materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
         final KTable<String, String> table = builder.table("table-topic", consumed, materializedInternal);
-        assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
+
 
         final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count"));
+        builder.buildAndOptimizeTopology();
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count"));
     }
@@ -268,6 +274,7 @@ public class InternalStreamsBuilderTest {
         final String topicName = "topic-1";
         final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST));
         builder.stream(Collections.singleton(topicName), consumed);
+        builder.buildAndOptimizeTopology();
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -279,7 +286,7 @@ public class InternalStreamsBuilderTest {
 
         final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST));
         builder.stream(Collections.singleton(topicName), consumed);
-
+        builder.buildAndOptimizeTopology();
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
     }
@@ -288,7 +295,7 @@ public class InternalStreamsBuilderTest {
     public void shouldAddTableToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
         builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.EARLIEST)), materialized);
-
+        builder.buildAndOptimizeTopology();
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
     }
@@ -297,7 +304,7 @@ public class InternalStreamsBuilderTest {
     public void shouldAddTableToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
         builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.LATEST)), materialized);
-
+        builder.buildAndOptimizeTopology();
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
     }
@@ -330,6 +337,7 @@ public class InternalStreamsBuilderTest {
         final String topicTwo = "topic-500000";
 
         builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)));
+        builder.buildAndOptimizeTopology();
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
@@ -341,6 +349,7 @@ public class InternalStreamsBuilderTest {
         final String topicTwo = "topic-1000000";
 
         builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)));
+        builder.buildAndOptimizeTopology();
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
@@ -349,6 +358,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.stream(Collections.singleton("topic"), consumed);
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
@@ -357,6 +367,7 @@ public class InternalStreamsBuilderTest {
     public void shouldUseProvidedTimestampExtractor() {
         final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
         builder.stream(Collections.singleton("topic"), consumed);
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
@@ -364,6 +375,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() {
         builder.table("topic", consumed, materialized);
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
@@ -372,6 +384,7 @@ public class InternalStreamsBuilderTest {
     public void ktableShouldUseProvidedTimestampExtractor() {
         final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
         builder.table("topic", consumed, materialized);
+        builder.buildAndOptimizeTopology();
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
new file mode 100644
index 0000000..5012704
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamsGraphTest {
+    
+    private String expectedJoinedTopology = "Topologies:\n"
+                                            + "   Sub-topology: 0\n"
+                                            + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+                                            + "      --> KSTREAM-WINDOWED-0000000002\n"
+                                            + "    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n"
+                                            + "      --> KSTREAM-WINDOWED-0000000003\n"
+                                            + "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                            + "      --> KSTREAM-JOINTHIS-0000000004\n"
+                                            + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                            + "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                            + "      --> KSTREAM-JOINOTHER-0000000005\n"
+                                            + "      <-- KSTREAM-SOURCE-0000000001\n"
+                                            + "    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                            + "      --> KSTREAM-MERGE-0000000006\n"
+                                            + "      <-- KSTREAM-WINDOWED-0000000003\n"
+                                            + "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                            + "      --> KSTREAM-MERGE-0000000006\n"
+                                            + "      <-- KSTREAM-WINDOWED-0000000002\n"
+                                            + "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n"
+                                            + "      --> none\n"
+                                            + "      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n";
+
+    private String expectedJoinedFilteredTopology = "Topologies:\n"
+                                                    + "   Sub-topology: 0\n"
+                                                    + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+                                                    + "      --> KSTREAM-WINDOWED-0000000002\n"
+                                                    + "    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n"
+                                                    + "      --> KSTREAM-WINDOWED-0000000003\n"
+                                                    + "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                                    + "      --> KSTREAM-JOINTHIS-0000000004\n"
+                                                    + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                                    + "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                                    + "      --> KSTREAM-JOINOTHER-0000000005\n"
+                                                    + "      <-- KSTREAM-SOURCE-0000000001\n"
+                                                    + "    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                                    + "      --> KSTREAM-MERGE-0000000006\n"
+                                                    + "      <-- KSTREAM-WINDOWED-0000000003\n"
+                                                    + "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                                    + "      --> KSTREAM-MERGE-0000000006\n"
+                                                    + "      <-- KSTREAM-WINDOWED-0000000002\n"
+                                                    + "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n"
+                                                    + "      --> KSTREAM-FILTER-0000000007\n"
+                                                    + "      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n"
+                                                    + "    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
+                                                    + "      --> none\n"
+                                                    + "      <-- KSTREAM-MERGE-0000000006\n\n";
+
+    private String expectedFullTopology = "Topologies:\n"
+                                          + "   Sub-topology: 0\n"
+                                          + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+                                          + "      --> KSTREAM-WINDOWED-0000000002\n"
+                                          + "    Source: KSTREAM-SOURCE-0000000001 (topics: [other-topic])\n"
+                                          + "      --> KSTREAM-WINDOWED-0000000003\n"
+                                          + "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                          + "      --> KSTREAM-JOINTHIS-0000000004\n"
+                                          + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                          + "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                          + "      --> KSTREAM-JOINOTHER-0000000005\n"
+                                          + "      <-- KSTREAM-SOURCE-0000000001\n"
+                                          + "    Processor: KSTREAM-JOINOTHER-0000000005 (stores: [KSTREAM-JOINTHIS-0000000004-store])\n"
+                                          + "      --> KSTREAM-MERGE-0000000006\n"
+                                          + "      <-- KSTREAM-WINDOWED-0000000003\n"
+                                          + "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [KSTREAM-JOINOTHER-0000000005-store])\n"
+                                          + "      --> KSTREAM-MERGE-0000000006\n"
+                                          + "      <-- KSTREAM-WINDOWED-0000000002\n"
+                                          + "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n"
+                                          + "      --> KSTREAM-FILTER-0000000007\n"
+                                          + "      <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n"
+                                          + "    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
+                                          + "      --> KSTREAM-MAPVALUES-0000000008\n"
+                                          + "      <-- KSTREAM-MERGE-0000000006\n"
+                                          + "    Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])\n"
+                                          + "      --> KSTREAM-SINK-0000000009\n"
+                                          + "      <-- KSTREAM-FILTER-0000000007\n"
+                                          + "    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n"
+                                          + "      <-- KSTREAM-MAPVALUES-0000000008\n\n";
+
+    // Test builds topology in succesive manner but only graph node not yet processed written to topology
+
+    @Test
+    public void shouldBeAbleToBuildTopologyIncrementally() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> stream = builder.stream("topic");
+        final KStream<String, String> streamII = builder.stream("other-topic");
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+
+
+        final KStream<String, String> joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of(5000));
+
+        // build step one
+        assertEquals(expectedJoinedTopology, builder.build().describe().toString());
+
+        KStream<String, String> filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo"));
+        // build step two
+        assertEquals(expectedJoinedFilteredTopology, builder.build().describe().toString());
+
+        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
+        // build step three
+        assertEquals(expectedFullTopology, builder.build().describe().toString());
+
+
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8f74cd2..b27e16f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -228,12 +228,14 @@ public class StandbyTaskTest {
 
         final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
 
-        new InternalStreamsBuilder(internalTopologyBuilder)
-            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
+        final InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>())
             .groupByKey()
             .windowedBy(TimeWindows.of(60_000).until(120_000))
             .count(Materialized.as(storeName));
 
+        builder.buildAndOptimizeTopology();
+
         final StandbyTask task = new StandbyTask(
             taskId,
             partitions,
@@ -323,11 +325,13 @@ public class StandbyTaskTest {
 
         final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
 
-        new InternalStreamsBuilder(internalTopologyBuilder)
-            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
+        final InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>())
             .groupByKey()
             .count(Materialized.as(storeName));
 
+        builder.buildAndOptimizeTopology();
+
         consumer.assign(partitions);
 
         final StandbyTask task = new StandbyTask(
@@ -479,6 +483,7 @@ public class StandbyTaskTest {
 
     private void initializeStandbyStores(final InternalStreamsBuilder builder) throws IOException {
         final StreamsConfig config = createConfig(baseDir);
+        builder.buildAndOptimizeTopology();
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
         final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 93d4e94..6c2eb4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -33,7 +34,6 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -394,6 +394,7 @@ public class StreamThreadTest {
     @Test
     public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+        internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
@@ -785,6 +786,7 @@ public class StreamThreadTest {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
 
+        internalStreamsBuilder.buildAndOptimizeTopology();
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(
@@ -839,6 +841,7 @@ public class StreamThreadTest {
         materialized.generateStoreNameIfNeeded(internalStreamsBuilder, "");
         internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized);
 
+        internalStreamsBuilder.buildAndOptimizeTopology();
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(changelogName1,
@@ -927,6 +930,7 @@ public class StreamThreadTest {
         };
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
+        internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
@@ -986,6 +990,7 @@ public class StreamThreadTest {
     public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
+        internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1036,6 +1041,7 @@ public class StreamThreadTest {
     public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
         internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"));
+        internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread("clientId", config, false);
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 2577bb8..9693184 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -731,9 +731,6 @@ public class StreamsPartitionAssignorTest {
     public void shouldGenerateTasksForAllCreatedPartitions() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
-        internalTopologyBuilder.setApplicationId(applicationId);
-
         // KStream with 3 partitions
         final KStream<Object, Object> stream1 = builder
             .stream("topic1")
@@ -771,6 +768,8 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        internalTopologyBuilder.setApplicationId(applicationId);
 
         mockTaskManager(
             Collections.<TaskId>emptySet(),
@@ -910,8 +909,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
-        internalTopologyBuilder.setApplicationId(applicationId);
+
 
         final KStream<Object, Object> stream1 = builder
 
@@ -969,6 +967,9 @@ public class StreamsPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        internalTopologyBuilder.setApplicationId(applicationId);
+
         mockTaskManager(
             Collections.<TaskId>emptySet(),
             Collections.<TaskId>emptySet(),
@@ -1020,10 +1021,10 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final StreamsBuilder builder = new StreamsBuilder();
 
+        builder.stream("topic1").groupByKey().count();
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
-        builder.stream("topic1").groupByKey().count();
 
         final UUID uuid = UUID.randomUUID();
         mockTaskManager(


Mime
View raw message