kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6761: Construct logical Streams Graph in DSL Parsing (#4983)
Date Mon, 18 Jun 2018 17:58:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1354371  KAFKA-6761: Construct logical Streams Graph in DSL Parsing (#4983)
1354371 is described below

commit 1354371d4f4dd208458c776e1f27715ec5f77f47
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Mon Jun 18 13:58:26 2018 -0400

    KAFKA-6761: Construct logical Streams Graph in DSL Parsing (#4983)
    
    This version is a WIP and intentionally leaves out some additional required changes to keep the reviewing effort more manageable. This version of the process includes
    
    1. Cleaning up the graph objects to reduce the number of parameters and make the naming conventions more clear.
    2. Intercepting all calls to the InternalToplogyBuilder and capturing all details required for possible optimizations and building the final topology.
    
    This PR does not include writing out the current physical plan, so no tests included. The next PR will include additional changes to building the graph and writing the topology out without optimizations, using the current streams tests.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/internals/AbstractStream.java  |  22 +-
 .../kstream/internals/BaseJoinProcessorNode.java   |  99 ------
 .../internals/GroupedStreamAggregateBuilder.java   |  58 +++-
 .../kstream/internals/InternalStreamsBuilder.java  |  98 ++++--
 .../kstream/internals/KGroupedStreamImpl.java      |  15 +-
 .../kstream/internals/KGroupedTableImpl.java       |  79 ++++-
 .../streams/kstream/internals/KStreamImpl.java     | 357 +++++++++++++++++----
 .../streams/kstream/internals/KTableImpl.java      | 172 +++++++---
 .../kstream/internals/ProducedInternal.java        |   8 +-
 .../streams/kstream/internals/RepartitionNode.java | 165 ----------
 .../internals/SessionWindowedKStreamImpl.java      |   6 +-
 .../kstream/internals/StatefulProcessorNode.java   | 133 --------
 .../kstream/internals/StatefulRepartitionNode.java | 169 ----------
 .../kstream/internals/StatefulSourceNode.java      | 197 ------------
 .../kstream/internals/StreamsGraphNode.java        | 106 ------
 .../kstream/internals/StreamsTopologyGraph.java    | 137 --------
 .../kstream/internals/TimeWindowedKStreamImpl.java |   8 +-
 .../internals/graph/BaseJoinProcessorNode.java     |  78 +++++
 .../internals/graph/BaseRepartitionNode.java       |  56 ++++
 .../GroupedTableOperationRepartitionNode.java      | 150 +++++++++
 .../{ => graph}/KTableKTableJoinNode.java          |  70 ++--
 .../graph/OptimizableRepartitionNode.java          | 153 +++++++++
 .../internals/{ => graph}/ProcessorParameters.java |  10 +-
 .../internals/graph/StatefulProcessorNode.java     | 115 +++++++
 .../{ => graph}/StatelessProcessorNode.java        |  30 +-
 .../internals/{ => graph}/StreamSinkNode.java      |  27 +-
 .../internals/{ => graph}/StreamSourceNode.java    |  53 +--
 .../{ => graph}/StreamStreamJoinNode.java          |  89 ++---
 .../internals/{ => graph}/StreamTableJoinNode.java |  32 +-
 .../kstream/internals/graph/StreamsGraphNode.java  |  97 ++++++
 .../internals/graph/TableProcessorNode.java        |  47 +++
 .../kstream/internals/graph/TableSourceNode.java   | 141 ++++++++
 .../streams/processor/StateStoreSupplier.java      |  59 ----
 .../kstream/internals/AbstractStreamTest.java      |   4 +-
 34 files changed, 1625 insertions(+), 1415 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index aa2a727..460665c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,12 +17,13 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
@@ -35,6 +36,7 @@ public abstract class AbstractStream<K> {
     protected final InternalStreamsBuilder builder;
     protected final String name;
     protected final Set<String> sourceNodes;
+    protected final StreamsGraphNode parentGraphNode;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
@@ -42,9 +44,13 @@ public abstract class AbstractStream<K> {
         this.builder = stream.builder;
         this.name = stream.name;
         this.sourceNodes = stream.sourceNodes;
+        this.parentGraphNode = stream.parentGraphNode;
     }
 
-    AbstractStream(final InternalStreamsBuilder builder, String name, final Set<String> sourceNodes) {
+    AbstractStream(final InternalStreamsBuilder builder,
+                   final String name,
+                   final Set<String> sourceNodes,
+                   final StreamsGraphNode parentGraphNode) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
         }
@@ -52,6 +58,14 @@ public abstract class AbstractStream<K> {
         this.builder = builder;
         this.name = name;
         this.sourceNodes = sourceNodes;
+        this.parentGraphNode = parentGraphNode;
+    }
+
+    protected void addGraphNode(final StreamsGraphNode newNode) {
+        //TODO remove this once actually building the topology with Graph
+        if (parentGraphNode != null) {
+            parentGraphNode.addChildNode(newNode);
+        }
     }
 
     // This method allows to expose the InternalTopologyBuilder instance
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
deleted file mode 100644
index 899ee71..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-/**
- * Utility base class containing the common fields between
- * a Stream-Stream join and a Table-Table join
- */
-abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode {
-
-    private final ProcessorSupplier<K, V1> joinThisProcessSupplier;
-    private final ProcessorSupplier<K, V2> joinOtherProcessSupplier;
-    private final ProcessorSupplier<K, VR> joinMergeProcessor;
-    private final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
-    private final String joinThisProcessorName;
-    private final String joinOtherProcessorName;
-    private final String joinMergeProcessorName;
-    private final String thisJoinSide;
-    private final String otherJoinSide;
-
-
-    BaseJoinProcessorNode(final String parentProcessorNodeName,
-                          final String processorNodeName,
-                          final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
-                          final ProcessorParameters<K, V1> joinThisProcessorDetails,
-                          final ProcessorParameters<K, V2> joinOtherProcessDetails,
-                          final ProcessorParameters<K, VR> joinMergeProcessorDetails,
-                          final String thisJoinSide,
-                          final String otherJoinSide) {
-
-        super(parentProcessorNodeName,
-              processorNodeName,
-              false);
-
-        this.valueJoiner = valueJoiner;
-        this.joinThisProcessSupplier = joinThisProcessorDetails.processorSupplier();
-        this.joinOtherProcessSupplier = joinOtherProcessDetails.processorSupplier();
-        this.joinMergeProcessor = joinMergeProcessorDetails.processorSupplier();
-        this.joinThisProcessorName = joinThisProcessorDetails.processorName();
-        this.joinOtherProcessorName = joinOtherProcessDetails.processorName();
-        this.joinMergeProcessorName = joinMergeProcessorDetails.processorName();
-        this.thisJoinSide = thisJoinSide;
-        this.otherJoinSide = otherJoinSide;
-    }
-
-    ProcessorSupplier<K, V1> joinThisProcessorSupplier() {
-        return joinThisProcessSupplier;
-    }
-
-    ProcessorSupplier<K, V2> joinOtherProcessorSupplier() {
-        return joinOtherProcessSupplier;
-    }
-
-    ProcessorSupplier<K, VR> joinMergeProcessor() {
-        return joinMergeProcessor;
-    }
-
-    ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner() {
-        return valueJoiner;
-    }
-
-    String joinThisProcessorName() {
-        return joinThisProcessorName;
-    }
-
-    String joinOtherProcessorName() {
-        return joinOtherProcessorName;
-    }
-
-    String joinMergeProcessorName() {
-        return joinMergeProcessorName;
-    }
-
-    String thisJoinSide() {
-        return thisJoinSide;
-    }
-
-    String otherJoinSide() {
-        return otherJoinSide;
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 715c291..247c882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -20,6 +20,10 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Collections;
@@ -32,6 +36,7 @@ class GroupedStreamAggregateBuilder<K, V> {
     private final boolean repartitionRequired;
     private final Set<String> sourceNodes;
     private final String name;
+    private final StreamsGraphNode streamsGraphNode;
 
     final Initializer<Long> countInitializer = new Initializer<Long>() {
         @Override
@@ -59,7 +64,8 @@ class GroupedStreamAggregateBuilder<K, V> {
                                   final Serde<V> valueSerde,
                                   final boolean repartitionRequired,
                                   final Set<String> sourceNodes,
-                                  final String name) {
+                                  final String name,
+                                  final StreamsGraphNode streamsGraphNode) {
 
         this.builder = builder;
         this.keySerde = keySerde;
@@ -67,6 +73,7 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.repartitionRequired = repartitionRequired;
         this.sourceNodes = sourceNodes;
         this.name = name;
+        this.streamsGraphNode = streamsGraphNode;
     }
 
     <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
@@ -74,26 +81,55 @@ class GroupedStreamAggregateBuilder<K, V> {
                            final StoreBuilder storeBuilder,
                            final boolean isQueryable) {
         final String aggFunctionName = builder.newProcessorName(functionName);
-        final String sourceName = repartitionIfRequired(storeBuilder.name());
+
+        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+
+        final String sourceName = repartitionIfRequired(storeBuilder.name(), repartitionNodeBuilder);
         builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
         builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
 
-        return new KTableImpl<>(
-                builder,
-                aggFunctionName,
-                aggregateSupplier,
-                sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
-                storeBuilder.name(),
-                isQueryable);
+
+        StreamsGraphNode parentNode = streamsGraphNode;
+
+        if (!sourceName.equals(this.name)) {
+            StreamsGraphNode repartitionNode = repartitionNodeBuilder.build();
+            streamsGraphNode.addChildNode(repartitionNode);
+            parentNode = repartitionNode;
+        }
+
+        StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+        ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName);
+        statefulProcessorNodeBuilder
+            .withProcessorParameters(processorParameters)
+            .withNodeName(aggFunctionName)
+            .withRepartitionRequired(repartitionRequired)
+            .withStoreBuilder(storeBuilder);
+
+        StatefulProcessorNode<K, T> statefulProcessorNode = statefulProcessorNodeBuilder.build();
+
+        parentNode.addChildNode(statefulProcessorNode);
+
+        return new KTableImpl<>(builder,
+                                aggFunctionName,
+                                aggregateSupplier,
+                                sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
+                                storeBuilder.name(),
+                                isQueryable,
+                                statefulProcessorNode);
     }
 
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired(final String queryableStoreName) {
+    private String repartitionIfRequired(final String queryableStoreName,
+                                         final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
         if (!repartitionRequired) {
             return this.name;
         }
-        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, queryableStoreName, name);
+        // if repartition required the operation
+        // captured needs to be set in the graph
+        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, queryableStoreName, name, optimizableRepartitionNodeBuilder);
+
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c7bf2fa..c42a93e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -20,6 +20,10 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -34,9 +38,17 @@ import java.util.regex.Pattern;
 public class InternalStreamsBuilder implements InternalNameProvider {
 
     final InternalTopologyBuilder internalTopologyBuilder;
-
     private final AtomicInteger index = new AtomicInteger(0);
 
+    private static final String TOPOLOGY_ROOT = "root";
+
+    protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT, false) {
+        @Override
+        public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+            // no-op for root node
+        }
+    };
+
     public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
         this.internalTopologyBuilder = internalTopologyBuilder;
     }
@@ -45,6 +57,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
+        StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name,
+                                                                         topics,
+                                                                         consumed);
+
+        root.addChildNode(streamSourceNode);
+
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
                                           consumed.timestampExtractor(),
@@ -52,12 +70,18 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                           consumed.valueDeserializer(),
                                           topics.toArray(new String[topics.size()]));
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false, streamSourceNode);
     }
 
-    public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
+    public <K, V> KStream<K, V> stream(final Pattern topicPattern,
+                                       final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
 
+        StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name,
+                                                                                topicPattern,
+                                                                                consumed);
+        root.addChildNode(streamPatternSourceNode);
+
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           name,
                                           consumed.timestampExtractor(),
@@ -65,38 +89,37 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                           consumed.valueDeserializer(),
                                           topicPattern);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+        return new KStreamImpl<>(this,
+                                 name,
+                                 Collections.singleton(name),
+                                 false,
+                                 streamPatternSourceNode);
     }
 
     @SuppressWarnings("unchecked")
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+
         final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
 
         final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
         final String name = newProcessorName(KTableImpl.SOURCE_NAME);
-        final KTable<K, V> kTable = createKTable(consumed,
-                                                 topic,
-                                                 storeBuilder.name(),
-                                                 materialized.isQueryable(),
-                                                 source,
-                                                 name);
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeBuilder.name());
+        final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
 
-        internalTopologyBuilder.addStateStore(storeBuilder, name);
-        internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
+        TableSourceNode.TableSourceNodeBuilder<K, V> tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder();
 
-        return kTable;
-    }
 
+        TableSourceNode<K, V> tableSourceNode = tableSourceNodeBuilder.withNodeName(name)
+                                                                               .withSourceName(source)
+                                                                               .withStoreBuilder(storeBuilder)
+                                                                               .withConsumedInternal(consumed)
+                                                                               .withProcessorParameters(processorParameters)
+                                                                               .withTopic(topic)
+                                                                               .build();
 
-    private <K, V> KTable<K, V> createKTable(final ConsumedInternal<K, V> consumed,
-                                             final String topic,
-                                             final String storeName,
-                                             final boolean isQueryable,
-                                             final String source,
-                                             final String name) {
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
+        root.addChildNode(tableSourceNode);
 
         internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
                                           source,
@@ -106,8 +129,18 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                           topic);
         internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
-        return new KTableImpl<>(this, name, processorSupplier,
-                                consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
+        internalTopologyBuilder.addStateStore(storeBuilder, name);
+        internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
+
+        return new KTableImpl<>(this,
+                                name,
+                                processorSupplier,
+                                consumed.keySerde(),
+                                consumed.valueSerde(),
+                                Collections.singleton(source),
+                                storeBuilder.name(),
+                                materialized.isQueryable(),
+                                tableSourceNode);
     }
 
     @SuppressWarnings("unchecked")
@@ -123,6 +156,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
         final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
 
+        final ProcessorParameters processorParameters = new ProcessorParameters(tableSource, processorName);
+
+        TableSourceNode<K, V> tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withStoreBuilder(storeBuilder)
+                                                                                                    .withSourceName(sourceName)
+                                                                                                    .withConsumedInternal(consumed)
+                                                                                                    .withTopic(topic)
+                                                                                                    .withProcessorParameters(processorParameters)
+                                                                                                    .isGlobalKTable(true)
+                                                                                                    .build();
+
+        root.addChildNode(tableSourceNode);
 
         internalTopologyBuilder.addGlobalStore(storeBuilder,
                                                sourceName,
@@ -132,6 +176,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                topic,
                                                processorName,
                                                tableSource);
+
         return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()), materialized.isQueryable());
     }
 
@@ -166,7 +211,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                processorName,
                                                stateUpdateSupplier);
     }
-    
+
     public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
                                             final String topic,
                                             final ConsumedInternal consumed,
@@ -182,4 +227,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                        processorName,
                        stateUpdateSupplier);
     }
+
+
+    public StreamsGraphNode root() {
+        return root;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 74f930e..8fa1752 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
@@ -51,14 +52,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                        final Set<String> sourceNodes,
                        final Serde<K> keySerde,
                        final Serde<V> valSerde,
-                       final boolean repartitionRequired) {
-        super(builder, name, sourceNodes);
+                       final boolean repartitionRequired,
+                       final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder,
                                                                     keySerde,
                                                                     valSerde,
                                                                     repartitionRequired,
                                                                     sourceNodes,
-                                                                    name);
+                                                                    name,
+                                                                    streamsGraphNode);
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
@@ -161,7 +164,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                              name,
                                              keySerde,
                                              valSerde,
-                                             repartitionRequired);
+                                             repartitionRequired,
+                                             parentGraphNode);
     }
 
     @Override
@@ -172,7 +176,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 name,
                                                 keySerde,
                                                 valSerde,
-                                                aggregateBuilder);
+                                                aggregateBuilder,
+                                                parentGraphNode);
     }
 
     private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 49f258b..be98cca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -27,6 +27,10 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -73,17 +77,19 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                       final String name,
                       final String sourceName,
                       final Serde<K> keySerde,
-                      final Serde<V> valSerde) {
-        super(builder, name, Collections.singleton(sourceName));
+                      final Serde<V> valSerde,
+                      final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, Collections.singleton(sourceName), streamsGraphNode);
         this.keySerde = keySerde;
         this.valSerde = valSerde;
     }
 
-    private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
-                                final String topic,
-                                final String funcName,
-                                final String sourceName,
-                                final String sinkName) {
+    private <T> void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
+                                    final String topic,
+                                    final String funcName,
+                                    final String sourceName,
+                                    final String sinkName) {
+
         final Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
         final Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
         final Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
@@ -109,16 +115,67 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
+        final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+
 
         buildAggregate(aggregateSupplier,
-                       materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
+                       topic,
                        funcName,
-                       sourceName, sinkName);
+                       sourceName,
+                       sinkName
+        );
+
         builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized)
-                                                              .materialize(), funcName);
+                                                          .materialize(), funcName);
+
+
+        StreamsGraphNode repartitionNode = createRepartitionNode(sinkName,
+                                                                 sourceName,
+                                                                 topic);
+        addGraphNode(repartitionNode);
+
+        StatefulProcessorNode statefulProcessorNode = createStatefulProcessorNode(materialized,
+                                                                                  funcName,
+                                                                                  aggregateSupplier);
+
+        repartitionNode.addChildNode(statefulProcessorNode);
+
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), materialized.isQueryable());
+        return new KTableImpl<>(builder,
+                                funcName,
+                                aggregateSupplier,
+                                Collections.singleton(sourceName),
+                                materialized.storeName(),
+                                materialized.isQueryable(),
+                                statefulProcessorNode);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> StatefulProcessorNode createStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
+                                                                  final String functionName,
+                                                                  final ProcessorSupplier aggregateSupplier) {
+
+        ProcessorParameters aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
+
+        return StatefulProcessorNode.statefulProcessorNodeBuilder()
+            .withNodeName(functionName)
+            .withProcessorParameters(aggregateFunctionProcessorParams)
+            .withStoreBuilder(new KeyValueStoreMaterializer(materialized).materialize()).build();
+    }
+
+    @SuppressWarnings("unchecked")
+    private GroupedTableOperationRepartitionNode createRepartitionNode(final String sinkName,
+                                                                       final String sourceName,
+                                                                       final String topic) {
+
+        return GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder()
+            .withRepartitionTopic(topic)
+            .withSinkName(sinkName)
+            .withSourceName(sourceName)
+            .withKeySerde(keySerde)
+            .withValueSerde(valSerde)
+            .withNodeName(sourceName).build();
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index e7dabbf..bc56a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -38,6 +38,14 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StatelessProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -48,6 +56,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.lang.reflect.Array;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Objects;
@@ -110,8 +119,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public KStreamImpl(final InternalStreamsBuilder builder,
                        final String name,
                        final Set<String> sourceNodes,
-                       final boolean repartitionRequired) {
-        super(builder, name, sourceNodes);
+                       final boolean repartitionRequired,
+                       final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         this.repartitionRequired = repartitionRequired;
     }
 
@@ -120,9 +130,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = builder.newProcessorName(FILTER_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
+        StatelessProcessorNode<K, V> filterProcessorNode = new StatelessProcessorNode<>(name,
+                                                                                        processorParameters,
+                                                                                        repartitionRequired);
+        addGraphNode(filterProcessorNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterProcessorNode);
     }
 
     @Override
@@ -130,32 +145,44 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = builder.newProcessorName(FILTER_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
+
+        StatelessProcessorNode<K, V> filterNotProcessorNode = new StatelessProcessorNode<>(name,
+                                                                                           processorParameters,
+                                                                                           repartitionRequired);
+
+        addGraphNode(filterNotProcessorNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterNotProcessorNode);
     }
 
     @Override
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        return new KStreamImpl<>(builder, internalSelectKey(mapper), sourceNodes, true);
+
+        StatelessProcessorNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
+        selectKeyProcessorNode.keyChangingOperation(true);
+        addGraphNode(selectKeyProcessorNode);
+        return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), sourceNodes, true, selectKeyProcessorNode);
     }
 
-    private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+    private <K1> StatelessProcessorNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         String name = builder.newProcessorName(KEY_SELECT_NAME);
-        builder.internalTopologyBuilder.addProcessor(
-            name,
-            new KStreamMap<>(
-                new KeyValueMapper<K, V, KeyValue<K1, V>>() {
-                    @Override
-                    public KeyValue<K1, V> apply(K key, V value) {
-                        return new KeyValue<>(mapper.apply(key, value), value);
-                    }
-                }
-            ),
-            this.name
-        );
-        return name;
+
+
+        KStreamMap kStreamMap = new KStreamMap<>(
+            (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value));
+
+
+        ProcessorParameters<K1, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
+
+        builder.internalTopologyBuilder.addProcessor(name, kStreamMap, this.name);
+
+        return  new StatelessProcessorNode<>(name,
+                                             processorParameters,
+                                             repartitionRequired);
+
     }
 
     @Override
@@ -163,9 +190,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(MAP_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
+
+        StatelessProcessorNode<K1, V1> mapProcessorNode = new StatelessProcessorNode<>(name,
+                                                                                       processorParameters,
+                                                                                       true);
+        mapProcessorNode.keyChangingOperation(true);
+        addGraphNode(mapProcessorNode);
+
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true, mapProcessorNode);
     }
 
 
@@ -179,9 +214,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(MAPVALUES_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
+
+        StatelessProcessorNode<K, V> mapValuesProcessorNode = new StatelessProcessorNode<>(name,
+                                                                                           processorParameters,
+                                                                                           repartitionRequired);
+        addGraphNode(mapValuesProcessorNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode);
     }
 
     @Override
@@ -189,17 +230,33 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(printed, "printed can't be null");
         final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
         final String name = builder.newProcessorName(PRINTING_NAME);
+
+        ProcessorParameters processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
+
+        StatelessProcessorNode<K, V> printNode = new StatelessProcessorNode<>(name,
+                                                                              processorParameters,
+                                                                              false);
+        addGraphNode(printNode);
         builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    public <K1, V1> KStream<K1, V1> flatMap(
+        final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(FLATMAP_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
+
+        StatelessProcessorNode<K1, V1> flatMapNode = new StatelessProcessorNode<>(name,
+                                                                                  processorParameters,
+                                                                                  true);
+        flatMapNode.keyChangingOperation(true);
+
+        addGraphNode(flatMapNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true, flatMapNode);
     }
 
     @Override
@@ -212,9 +269,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
+
+        StatelessProcessorNode<K, VR> flatMapValuesNode = new StatelessProcessorNode<>(name,
+                                                                                       processorParameters,
+                                                                                       repartitionRequired);
+        addGraphNode(flatMapValuesNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode);
     }
 
     @Override
@@ -236,21 +299,35 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone(), childNames), this.name);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
+
+        StatelessProcessorNode<K, V> branchNode = new StatelessProcessorNode<>(branchName,
+                                                                               processorParameters,
+                                                                               false);
+        addGraphNode(branchNode);
+
         KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
+
         for (int i = 0; i < predicates.length; i++) {
+            ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
+
+            StatelessProcessorNode<K, V> branchChildNode = new StatelessProcessorNode<>(childNames[i],
+                                                                                        innerProcessorParameters,
+                                                                                        repartitionRequired);
+            branchNode.addChildNode(branchChildNode);
             builder.internalTopologyBuilder.addProcessor(childNames[i], new KStreamPassThrough<K, V>(), branchName);
-            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired);
+            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired, branchChildNode);
         }
 
         return branchChildren;
     }
 
-    @Override 
+    @Override
     public KStream<K, V> merge(final KStream<K, V> stream) {
         Objects.requireNonNull(stream);
         return merge(builder, stream);
     }
-    
+
     private KStream<K, V> merge(final InternalStreamsBuilder builder,
                                 final KStream<K, V> stream) {
         KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
@@ -262,9 +339,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(streamImpl.sourceNodes);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPassThrough<>(), name);
+
+        StatelessProcessorNode<K, V> mergeNode = new StatelessProcessorNode<>(name,
+                                                                              processorParameters,
+                                                                              requireRepartitioning,
+                                                                              Arrays.asList(parentNames));
+
+        addGraphNode(mergeNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning);
+        return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode);
     }
 
     @Override
@@ -273,9 +358,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, producedInternal);
         return builder.stream(Collections.singleton(topic),
                               new ConsumedInternal<>(producedInternal.keySerde(),
-                                            producedInternal.valueSerde(),
-                                            new FailOnInvalidTimestamp(),
-                                            null));
+                                                     producedInternal.valueSerde(),
+                                                     new FailOnInvalidTimestamp(),
+                                                     null));
     }
 
     @Override
@@ -283,6 +368,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(action, "action can't be null");
         String name = builder.newProcessorName(FOREACH_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPeek<>(action, false), name);
+
+        StatelessProcessorNode<K, V> foreachNode = new StatelessProcessorNode<>(name,
+                                                                                processorParameters,
+                                                                                repartitionRequired);
+        addGraphNode(foreachNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name);
     }
 
@@ -291,9 +382,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(action, "action can't be null");
         final String name = builder.newProcessorName(PEEK_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
+
+        StatelessProcessorNode<K, V> peekNode = new StatelessProcessorNode<>(name,
+                                                                             processorParameters,
+                                                                             repartitionRequired);
+        addGraphNode(peekNode);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired, peekNode);
     }
 
     @Override
@@ -338,6 +435,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         } else {
             builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, partitioner, this.name);
         }
+
+        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
+            name,
+            topicExtractor,
+            produced);
+        addGraphNode(sinkNode);
     }
 
     @Override
@@ -346,12 +449,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
         String name = builder.newProcessorName(TRANSFORM_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
+
+        StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name,
+                                                                                  processorParameters,
+                                                                                  stateStoreNames,
+                                                                                  null,
+                                                                                  null,
+                                                                                  true);
+        transformNode.keyChangingOperation(true);
+        addGraphNode(transformNode);
+
         builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
             builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
         }
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true, null);
     }
 
     @Override
@@ -373,12 +487,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier,
                                                   final String... stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+
         builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerWithKeySupplier), this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
             builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
         }
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
+
+
+
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
+
+        StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name,
+                                                                                 processorParameters,
+                                                                                 stateStoreNames,
+                                                                                 null,
+                                                                                 null,
+                                                                                 repartitionRequired);
+        addGraphNode(transformNode);
+
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode);
     }
 
     @Override
@@ -386,6 +514,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                         final String... stateStoreNames) {
         final String name = builder.newProcessorName(PROCESSOR_NAME);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
+
+        StatefulProcessorNode<K, V> transformNode = new StatefulProcessorNode<>(name,
+                                                                                processorParameters,
+                                                                                stateStoreNames,
+                                                                                null,
+                                                                                null,
+                                                                                false);
+        addGraphNode(transformNode);
+
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
             builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
@@ -405,7 +543,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                         final JoinWindows windows,
                                         final Joined<K, V, VO> joined) {
         return doJoin(otherStream, joiner, windows, joined,
-                             new KStreamImplJoin(false, false));
+                      new KStreamImplJoin(false, false));
     }
 
     @Override
@@ -447,31 +585,44 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         joinThis.ensureJoinableWith(joinOther);
 
         return join.join(joinThis,
-            joinOther,
-            joiner,
-            windows,
-            joined);
+                         joinOther,
+                         joiner,
+                         windows,
+                         joined);
     }
 
     /**
      * Repartition a stream. This is required on join operations occurring after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
-     * @param keySerde      Serdes for serializing the keys
-     * @param valSerde      Serdes for serilaizing the values
+     *
+     * @param keySerde Serdes for serializing the keys
+     * @param valSerde Serdes for serilaizing the values
      * @return a new {@link KStreamImpl}
      */
     private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
                                                  final Serde<V> valSerde) {
-        String repartitionedSourceName = createRepartitionedSource(builder, keySerde, valSerde, null, name);
+        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        String repartitionedSourceName = createRepartitionedSource(builder,
+                                                                   keySerde,
+                                                                   valSerde,
+                                                                   null,
+                                                                   name,
+                                                                   repartitionNodeBuilder);
+
+        OptimizableRepartitionNode<K, V> repartitionNode = repartitionNodeBuilder.build();
+        addGraphNode(repartitionNode);
+
         return new KStreamImpl<>(builder, repartitionedSourceName, Collections
-            .singleton(repartitionedSourceName), false);
+            .singleton(repartitionedSourceName), false, repartitionNode);
     }
 
     static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
                                                      final Serde<K1> keySerde,
                                                      final Serde<V1> valSerde,
                                                      final String topicNamePrefix,
-                                                     final String name) {
+                                                     final String name,
+                                                     final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder) {
+
         Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
         Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
         Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
@@ -480,21 +631,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
         String sinkName = builder.newProcessorName(SINK_NAME);
-        String filterName = builder.newProcessorName(FILTER_NAME);
+        String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
         String sourceName = builder.newProcessorName(SOURCE_NAME);
 
+        Predicate<K1, V1> nullKeyPredicate = (k, v) -> k != null;
+
+        ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFilter<>(nullKeyPredicate, false), nullKeyFilterProcessorName);
+
+        repartitionNodeBuilder.withKeySerde(keySerde)
+            .withValueSerde(valSerde)
+            .withSourceName(sourceName)
+            .withRepartitionTopic(repartitionTopic)
+            .withSinkName(sinkName)
+            .withProcessorParameters(processorParameters)
+            // reusing the source name for the graph node name
+            // adding explicit variable as it simplifies logic
+            .withNodeName(sourceName);
+
         builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
-        builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
-            @Override
-            public boolean test(final K1 key, final V1 value) {
-                return key != null;
-            }
-        }, false), name);
+        builder.internalTopologyBuilder.addProcessor(nullKeyFilterProcessorName, new KStreamFilter<>(nullKeyPredicate, false), name);
 
         builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
-            null, filterName);
+                                                null, nullKeyFilterProcessorName);
         builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
-            keyDeserializer, valDeserializer, repartitionTopic);
+                                                  keyDeserializer, valDeserializer, repartitionTopic);
 
         return sourceName;
     }
@@ -565,7 +725,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
-        return new KStreamImpl<>(builder, name, sourceNodes, false);
+
+        ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(valueGetterSupplier,
+                                                                                  joiner,
+                                                                                  keyMapper,
+
+                                                                                  leftJoin);
+        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
+
+        StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
+                                                                                  processorParameters,
+                                                                                  new String[]{});
+        streamTableJoinNode.setGlobalKTableJoin(true);
+        addGraphNode(streamTableJoinNode);
+
+        return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode);
     }
 
     @SuppressWarnings("unchecked")
@@ -581,7 +755,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
         builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames());
 
-        return new KStreamImpl<>(builder, name, allSourceNodes, false);
+        ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
+                                                                            joiner,
+                                                                            leftJoin);
+
+        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
+                                                                                  processorParameters,
+                                                                                  ((KTableImpl) other).valueGetterSupplier().storeNames());
+
+        addGraphNode(streamTableJoinNode);
+
+        return new KStreamImpl<>(builder, name, allSourceNodes, false, streamTableJoinNode);
     }
 
     @Override
@@ -615,13 +800,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
         final SerializedInternal<KR, V> serializedInternal = new SerializedInternal<>(serialized);
-        String selectName = internalSelectKey(selector);
+        final StatelessProcessorNode<K, V> graphNode = internalSelectKey(selector);
+        graphNode.keyChangingOperation(true);
+
+        addGraphNode(graphNode);
         return new KGroupedStreamImpl<>(builder,
-                                        selectName,
+                                        graphNode.nodeName(),
                                         sourceNodes,
                                         serializedInternal.keySerde(),
                                         serializedInternal.valueSerde(),
-                                        true);
+                                        true,
+                                        graphNode);
     }
 
     @Override
@@ -632,12 +821,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
+
+        final StatelessProcessorNode<K, V> graphNode = new StatelessProcessorNode<>(
+            this.name,
+            null,
+            repartitionRequired,
+            Collections.<String>emptyList());
+
+        addGraphNode(graphNode);
+
         return new KGroupedStreamImpl<>(builder,
                                         this.name,
                                         sourceNodes,
                                         serializedInternal.keySerde(),
                                         serializedInternal.valueSerde(),
-                                        this.repartitionRequired);
+                                        this.repartitionRequired,
+                                        graphNode);
 
     }
 
@@ -682,7 +881,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             final StoreBuilder<WindowStore<K1, V2>> otherWindow =
                 createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store");
 
-
             KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(),
                                                                                    windows.beforeMs + windows.afterMs + 1,
                                                                                    windows.maintainMs());
@@ -691,18 +889,39 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                     windows.maintainMs());
 
             final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
-                windows.beforeMs,
-                windows.afterMs,
-                joiner,
-                leftOuter);
+                                                                                                        windows.beforeMs,
+                                                                                                        windows.afterMs,
+                                                                                                        joiner,
+                                                                                                        leftOuter);
             final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
-                windows.afterMs,
-                windows.beforeMs,
-                reverseJoiner(joiner),
-                rightOuter);
+                                                                                                         windows.afterMs,
+                                                                                                         windows.beforeMs,
+                                                                                                         reverseJoiner(joiner),
+                                                                                                         rightOuter);
 
             KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
 
+            StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
+
+            ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
+            ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
+            ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
+            ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
+            ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
+
+            joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
+                .withJoinThisProcessorParameters(joinThisProcessorParams)
+                .withJoinOtherProcessorParameters(joinOtherProcessorParams)
+                .withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
+                .withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
+                .withThisWindowStoreBuilder(thisWindow)
+                .withOtherWindowStoreBuilder(otherWindow)
+                .withLeftHandSideStreamName(((AbstractStream) lhs).name)
+                .withOtherStreamName(((AbstractStream) other).name)
+                .withValueJoiner(joiner)
+                .withNodeName(joinMergeName);
+
+            addGraphNode(joinBuilder.build());
 
             builder.internalTopologyBuilder.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
             builder.internalTopologyBuilder.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
@@ -714,7 +933,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false);
+            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, null);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 21c1505..2d349a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -30,10 +30,16 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatelessProcessorNode;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
@@ -80,8 +86,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       final ProcessorSupplier<?, ?> processorSupplier,
                       final Set<String> sourceNodes,
                       final String queryableStoreName,
-                      final boolean isQueryable) {
-        super(builder, name, sourceNodes);
+                      final boolean isQueryable,
+                      final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
         this.keySerde = null;
@@ -96,8 +103,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       final Serde<V> valSerde,
                       final Set<String> sourceNodes,
                       final String queryableStoreName,
-                      final boolean isQueryable) {
-        super(builder, name, sourceNodes);
+                      final boolean isQueryable,
+                      final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
         this.keySerde = keySerde;
@@ -115,32 +123,41 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
-                                  final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
+                                  final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                   final boolean filterNot) {
         final String name = builder.newProcessorName(FILTER_NAME);
 
         // only materialize if the state store is queryable
-        final boolean shouldMaterialize = materialized != null && materialized.isQueryable();
+        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
 
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
                 predicate,
                 filterNot,
-                shouldMaterialize ? materialized.storeName() : null);
+                shouldMaterialize ? materializedInternal.storeName() : null);
+
+        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
+                                                              processorParameters,
+                                                              materializedInternal,
+                                                              null);
+
+        addGraphNode(tableNode);
 
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
 
         if (shouldMaterialize) {
-            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), name);
+            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), name);
         }
 
         return new KTableImpl<>(builder,
-                name,
-                processorSupplier,
-                this.keySerde,
-                this.valSerde,
-                sourceNodes,
-                shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
-                shouldMaterialize);
+                                name,
+                                processorSupplier,
+                                this.keySerde,
+                                this.valSerde,
+                                sourceNodes,
+                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+                                shouldMaterialize,
+                                tableNode);
     }
 
     @Override
@@ -178,24 +195,38 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
-                                           final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+                                           final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
         final String name = builder.newProcessorName(MAPVALUES_NAME);
 
         // only materialize if the state store is queryable
-        final boolean shouldMaterialize = materialized != null && materialized.isQueryable();
+        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();
 
         final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(
                 this,
                 mapper,
-                shouldMaterialize ? materialized.storeName() : null);
+                shouldMaterialize ? materializedInternal.storeName() : null);
 
+        // leaving in calls to ITB until building topology with graph
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
-
         if (shouldMaterialize) {
-            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), name);
+            this.builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), name);
         }
 
-        return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, shouldMaterialize ? materialized.storeName() : this.queryableStoreName, shouldMaterialize);
+        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
+                                                              processorParameters,
+                                                              materializedInternal,
+                                                              null);
+
+        addGraphNode(tableNode);
+
+        return new KTableImpl<>(builder,
+                                name,
+                                processorSupplier,
+                                sourceNodes,
+                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+                                shouldMaterialize,
+                                tableNode);
     }
 
     @Override
@@ -265,6 +296,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             transformerSupplier,
             shouldMaterialize ? materialized.storeName() : null);
 
+        ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name);
+        StreamsGraphNode tableNode = new TableProcessorNode<>(name,
+                                                              processorParameters,
+                                                              materialized,
+                                                              stateStoreNames);
+
+        addGraphNode(tableNode);
+
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
 
         if (stateStoreNames.length > 0) {
@@ -283,13 +322,28 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             processorSupplier,
             sourceNodes,
             shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
-            shouldMaterialize);
+            shouldMaterialize,
+            tableNode);
     }
 
     @Override
     public KStream<K, V> toStream() {
         String name = builder.newProcessorName(TOSTREAM_NAME);
 
+        ProcessorSupplier kStreamMapValues = new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
+            @Override
+            public V apply(final K key, final Change<V> change) {
+                return change.newValue;
+            }
+        });
+        ProcessorParameters processorParameters = new ProcessorParameters<K, V>(kStreamMapValues, name);
+
+        final StatelessProcessorNode<K, V> toStreamNode = new StatelessProcessorNode<>(name,
+                                                                                       processorParameters,
+                                                                                       false);
+
+        addGraphNode(toStreamNode);
+
         builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(new ValueMapperWithKey<K, Change<V>, V>() {
             @Override
             public V apply(final K key, final Change<V> change) {
@@ -297,7 +351,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             }
         }), this.name);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, false);
+        return new KStreamImpl<>(builder, name, sourceNodes, false, toStreamNode);
     }
 
     @Override
@@ -358,27 +412,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @SuppressWarnings("unchecked")
     private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
                                           final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
-                                          final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
+                                          final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                           final boolean leftOuter,
                                           final boolean rightOuter) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
-        final String internalQueryableName = materialized == null ? null : materialized.storeName();
+        final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
         final String joinMergeName = builder.newProcessorName(MERGE_NAME);
-        final KTable<K, VR> result = buildJoin((AbstractStream<K>) other,
+
+        KTable<K, VR> kTable = buildJoin((AbstractStream<K>) other,
                                                joiner,
                                                leftOuter,
                                                rightOuter,
                                                joinMergeName,
-                                               internalQueryableName);
+                                               internalQueryableName,
+                                               materializedInternal);
 
-        // only materialize if specified in Materialized
-        if (materialized != null) {
+        if (materializedInternal != null) {
             final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
-                    = new KeyValueStoreMaterializer<>(materialized).materialize();
+                = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
             builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName);
         }
-        return result;
+
+        return kTable;
     }
 
     @SuppressWarnings("unchecked")
@@ -387,7 +443,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                            final boolean leftOuter,
                                            final boolean rightOuter,
                                            final String joinMergeName,
-                                           final String internalQueryableName) {
+                                           final String internalQueryableName,
+                                           final MaterializedInternal materializedInternal) {
         final Set<String> allSourceNodes = ensureJoinableWith(other);
 
         if (leftOuter) {
@@ -416,18 +473,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.queryableStoreName, false),
-                new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
-                        ((KTableImpl<K, ?, ?>) other).queryableStoreName, false),
-                internalQueryableName
+            new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.queryableStoreName, false, null),
+            new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
+                                     ((KTableImpl<K, ?, ?>) other).queryableStoreName, false, null),
+            internalQueryableName
         );
 
+        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
+
+        // only materialize if specified in Materialized
+        if (materializedInternal != null) {
+            kTableJoinNodeBuilder.withMaterializedInternal(materializedInternal);
+        }
+        kTableJoinNodeBuilder.withNodeName(joinMergeName);
+
+        ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
+        ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
+        ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
+
+        kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
+            .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+            .withJoinThisProcessorParameters(joinThisProcessorParameters)
+            .withJoinThisStoreNames(valueGetterSupplier().storeNames())
+            .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames());
+
+        KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
+        addGraphNode(kTableKTableJoinNode);
+
         builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, this.name);
         builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
         builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
         builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
-        return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
+
+        return new KTableImpl<>(builder,
+                                joinMergeName,
+                                joinMerge,
+                                allSourceNodes,
+                                internalQueryableName,
+                                internalQueryableName != null,
+                                kTableKTableJoinNode);
     }
 
     @Override
@@ -443,16 +528,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         String selectName = builder.newProcessorName(SELECT_NAME);
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
+        ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
         builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
+        final StatelessProcessorNode<K1, V1> graphNode = new StatelessProcessorNode<>(selectName,
+                                                                                      processorParameters,
+                                                                                      false,
+                                                                                      Collections.<String>emptyList());
+
+        addGraphNode(graphNode);
+
         this.enableSendingOldValues();
-        final SerializedInternal<K1, V1> serializedInternal  = new SerializedInternal<>(serialized);
+        final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
         return new KGroupedTableImpl<>(builder,
                                        selectName,
                                        this.name,
                                        serializedInternal.keySerde(),
-                                       serializedInternal.valueSerde());
+                                       serializedInternal.valueSerde(),
+                                       graphNode);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
index f7f68fe..3197244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -20,20 +20,20 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
-class ProducedInternal<K, V> extends Produced<K, V> {
+public class ProducedInternal<K, V> extends Produced<K, V> {
     ProducedInternal(final Produced<K, V> produced) {
         super(produced);
     }
 
-    Serde<K> keySerde() {
+    public Serde<K> keySerde() {
         return keySerde;
     }
 
-    Serde<V> valueSerde() {
+    public Serde<V> valueSerde() {
         return valueSerde;
     }
 
-    StreamPartitioner<? super K, ? super V> streamPartitioner() {
+    public StreamPartitioner<? super K, ? super V> streamPartitioner() {
         return partitioner;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
deleted file mode 100644
index d8aaee9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-
-class RepartitionNode<K, V> extends StatelessProcessorNode<K, V> {
-
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final String sinkName;
-    private final String sourceName;
-    private final String repartitionTopic;
-    private final String processorName;
-
-
-    RepartitionNode(final String parentProcessorNodeName,
-                    final String processorNodeName,
-                    final String sourceName,
-                    final ProcessorSupplier<K, V> processorSupplier,
-                    final Serde<K> keySerde,
-                    final Serde<V> valueSerde,
-                    final String sinkName,
-                    final String repartitionTopic,
-                    final String processorName) {
-
-        super(parentProcessorNodeName,
-              processorNodeName,
-              processorSupplier,
-              false);
-
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.sinkName = sinkName;
-        this.sourceName = sourceName;
-        this.repartitionTopic = repartitionTopic;
-        this.processorName = processorName;
-    }
-
-    Serde<K> keySerde() {
-        return keySerde;
-    }
-
-    Serde<V> valueSerde() {
-        return valueSerde;
-    }
-
-    String sinkName() {
-        return sinkName;
-    }
-
-    String sourceName() {
-        return sourceName;
-    }
-
-    String repartitionTopic() {
-        return repartitionTopic;
-    }
-
-    String processorName() {
-        return processorName;
-    }
-
-    @Override
-    void writeToTopology(InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
-    }
-
-    static <K, V> RepartitionNodeBuilder<K, V> repartitionNodeBuilder() {
-        return new RepartitionNodeBuilder<>();
-    }
-
-
-    static final class RepartitionNodeBuilder<K, V> {
-
-        private String processorNodeName;
-        private ProcessorSupplier<K, V> processorSupplier;
-        private Serde<K> keySerde;
-        private Serde<V> valueSerde;
-        private String sinkName;
-        private String sourceName;
-        private String repartitionTopic;
-        private String processorName;
-        private String parentProcessorNodeName;
-
-        private RepartitionNodeBuilder() {
-        }
-
-        RepartitionNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier<K, V> processorSupplier) {
-            this.processorSupplier = processorSupplier;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withKeySerde(final Serde<K> keySerde) {
-            this.keySerde = keySerde;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withValueSerde(final Serde<V> valueSerde) {
-            this.valueSerde = valueSerde;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withSinkName(final String sinkName) {
-            this.sinkName = sinkName;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withSourceName(final String sourceName) {
-            this.sourceName = sourceName;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withRepartitionTopic(final String repartitionTopic) {
-            this.repartitionTopic = repartitionTopic;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withProcessorName(final String processorName) {
-            this.processorName = processorName;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withParentProcessorNodeName(final String parentProcessorNodeName) {
-            this.parentProcessorNodeName = parentProcessorNodeName;
-            return this;
-        }
-
-        RepartitionNodeBuilder<K, V> withProcessorNodeName(final String processorNodeName) {
-            this.processorNodeName = processorNodeName;
-            return this;
-        }
-
-        RepartitionNode<K, V> build() {
-
-            return new RepartitionNode<>(parentProcessorNodeName,
-                                         processorNodeName,
-                                         sourceName,
-                                         processorSupplier,
-                                         keySerde,
-                                         valueSerde,
-                                         sinkName,
-                                         repartitionTopic,
-                                         processorName);
-
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index b3cbacd..dacab56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -57,8 +58,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
                                final String name,
                                final Serde<K> keySerde,
                                final Serde<V> valSerde,
-                               final GroupedStreamAggregateBuilder<K, V> aggregateBuilder) {
-        super(builder, name, sourceNodes);
+                               final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
+                               final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         Objects.requireNonNull(windows, "windows can't be null");
         this.windows = windows;
         this.keySerde = keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
deleted file mode 100644
index 57d30fe..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Arrays;
-
-class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
-
-    private final String[] storeNames;
-    private final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
-    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
-
-
-    StatefulProcessorNode(final String parentNodeName,
-                          final String processorNodeName,
-                          final ProcessorSupplier<K, V> processorSupplier,
-                          final String[] storeNames,
-                          final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
-                          final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
-                          final boolean repartitionRequired) {
-        super(parentNodeName,
-              processorNodeName,
-              processorSupplier,
-              repartitionRequired);
-
-        this.storeNames = storeNames;
-        this.storeSupplier = storeSupplier;
-        this.storeBuilder = storeBuilder;
-    }
-
-
-    String[] storeNames() {
-        return Arrays.copyOf(storeNames, storeNames.length);
-    }
-
-    org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier() {
-        return storeSupplier;
-    }
-
-    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
-        return storeBuilder;
-    }
-
-    @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
-    }
-
-    static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
-        return new StatefulProcessorNodeBuilder<>();
-    }
-
-    static final class StatefulProcessorNodeBuilder<K, V> {
-
-        private ProcessorSupplier processorSupplier;
-        private String processorNodeName;
-        private String parentProcessorNodeName;
-        private boolean repartitionRequired;
-        private String[] storeNames;
-        private org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
-        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
-
-        private StatefulProcessorNodeBuilder() {
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier processorSupplier) {
-            this.processorSupplier = processorSupplier;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withProcessorNodeName(final String processorNodeName) {
-            this.processorNodeName = processorNodeName;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withParentProcessorNodeName(final String parentProcessorNodeName) {
-            this.parentProcessorNodeName = parentProcessorNodeName;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
-            this.storeNames = storeNames;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
-            this.repartitionRequired = repartitionRequired;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withStoreSupplier(final StateStoreSupplier<KeyValueStore> storeSupplier) {
-            this.storeSupplier = storeSupplier;
-            return this;
-        }
-
-        StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
-            this.storeBuilder = storeBuilder;
-            return this;
-        }
-
-        StatefulProcessorNode<K, V> build() {
-            return new StatefulProcessorNode<>(parentProcessorNodeName,
-                                               processorNodeName,
-                                               processorSupplier,
-                                               storeNames,
-                                               storeSupplier,
-                                               storeBuilder,
-                                               repartitionRequired);
-
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
deleted file mode 100644
index e7d5140..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-class StatefulRepartitionNode<K, V, T> extends RepartitionNode<K, V> {
-
-    private final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
-    private final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized;
-
-    StatefulRepartitionNode(final String parentProcessorNodeName,
-                            final String processorNodeName,
-                            final String sourceName,
-                            final Serde<K> keySerde,
-                            final Serde<V> valueSerde,
-                            final String sinkName,
-                            final String repartitionTopic,
-                            final String processorName,
-                            final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier,
-                            final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
-        super(parentProcessorNodeName,
-              processorNodeName,
-              sourceName,
-              null,
-              keySerde,
-              valueSerde,
-              sinkName,
-              repartitionTopic,
-              processorName);
-
-        this.statefulProcessorSupplier = statefulProcessorSupplier;
-        this.materialized = materialized;
-    }
-
-    ProcessorSupplier<K, Change<V>> statefulProcessorSupplier() {
-        return statefulProcessorSupplier;
-    }
-
-    MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized() {
-        return materialized;
-    }
-
-    ChangedSerializer<? extends V> changedValueSerializer() {
-        final Serializer<? extends V> valueSerializer = valueSerde() == null ? null : valueSerde().serializer();
-        return new ChangedSerializer<>(valueSerializer);
-
-    }
-
-    ChangedDeserializer<? extends V> changedValueDeserializer() {
-        final Deserializer<? extends V> valueDeserializer = valueSerde() == null ? null : valueSerde().deserializer();
-        return new ChangedDeserializer<>(valueDeserializer);
-    }
-
-    static <K, V, T> StatefulRepartitionNodeBuilder<K, V, T> statefulRepartitionNodeBuilder() {
-        return new StatefulRepartitionNodeBuilder<>();
-    }
-
-    @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
-    }
-
-
-    static final class StatefulRepartitionNodeBuilder<K, V, T> {
-
-        private String parentProcessorNodeName;
-        private String processorNodeName;
-        private Serde<K> keySerde;
-        private Serde<V> valueSerde;
-        private String sinkName;
-        private String sourceName;
-        private String repartitionTopic;
-        private String processorName;
-        private ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
-        private MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized;
-
-        private StatefulRepartitionNodeBuilder() {
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withKeySerde(final Serde<K> keySerde) {
-            this.keySerde = keySerde;
-            return this;
-        }
-
-
-        StatefulRepartitionNodeBuilder<K, V, T> withValueSerde(final Serde<V> valueSerde) {
-            this.valueSerde = valueSerde;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withParentProcessorNodeName(final String parentProcessorNodeName) {
-            this.parentProcessorNodeName = parentProcessorNodeName;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withSinkName(final String sinkName) {
-            this.sinkName = sinkName;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withSourceName(final String sourceName) {
-            this.sourceName = sourceName;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withRepartitionTopic(final String repartitionTopic) {
-            this.repartitionTopic = repartitionTopic;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withProcessorNodeName(final String processorNodeName) {
-            this.processorName = processorNodeName;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withStatefulProcessorSupplier(final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier) {
-            this.statefulProcessorSupplier = statefulProcessorSupplier;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withMaterialized(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
-            this.materialized = materialized;
-            return this;
-        }
-
-        StatefulRepartitionNodeBuilder<K, V, T> withNodeName(final String nodeName) {
-            this.processorNodeName = nodeName;
-            return this;
-        }
-
-        public StatefulRepartitionNode<K, V, T> build() {
-
-            return new StatefulRepartitionNode<>(parentProcessorNodeName,
-                                                 processorNodeName,
-                                                 sourceName,
-                                                 keySerde,
-                                                 valueSerde,
-                                                 sinkName,
-                                                 repartitionTopic,
-                                                 processorName,
-                                                 statefulProcessorSupplier,
-                                                 materialized);
-
-
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
deleted file mode 100644
index b2fdc81..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Collections;
-
-/**
- * Used to represent either a KTable source or a GlobalKTable source.
- * The presence of a {@link KTableSource} indicates this source node supplies
- * a {@link org.apache.kafka.streams.kstream.GlobalKTable}
- */
-class StatefulSourceNode<K, V> extends StreamSourceNode<K, V> {
-
-    private org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier;
-    private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
-    private final ProcessorSupplier<K, V> processorSupplier;
-    private final String sourceName;
-    private final String processorName;
-    private final KTableSource<K, V> kTableSource;
-
-    StatefulSourceNode(final String predecessorNodeName,
-                       final String nodeName,
-                       final String sourceName,
-                       final String processorName,
-                       final String topic,
-                       final ConsumedInternal<K, V> consumedInternal,
-                       final ProcessorSupplier<K, V> processorSupplier,
-                       final KTableSource<K, V> kTableSource) {
-
-        super(predecessorNodeName,
-              nodeName,
-              Collections.singletonList(topic),
-              consumedInternal);
-
-        this.processorSupplier = processorSupplier;
-        this.sourceName = sourceName;
-        this.processorName = processorName;
-        this.kTableSource = kTableSource;
-    }
-
-    StateStoreSupplier<KeyValueStore> storeSupplier() {
-        return storeSupplier;
-    }
-
-    void setStoreSupplier(StateStoreSupplier<KeyValueStore> storeSupplier) {
-        this.storeSupplier = storeSupplier;
-    }
-
-    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
-        return storeBuilder;
-    }
-
-    void setStoreBuilder(StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
-        this.storeBuilder = storeBuilder;
-    }
-
-    ProcessorSupplier<K, V> processorSupplier() {
-        return processorSupplier;
-    }
-
-    String sourceName() {
-        return sourceName;
-    }
-
-    KTableSource<K, V> kTableSource() {
-        return kTableSource;
-    }
-
-    String processorName() {
-        return processorName;
-    }
-
-    boolean isGlobalKTable() {
-        return kTableSource != null;
-    }
-
-    static <K, V> StatefulSourceNodeBuilder<K, V> statefulSourceNodeBuilder() {
-        return new StatefulSourceNodeBuilder<>();
-    }
-
-    @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
-        //TODO will implement in follow-up pr
-    }
-
-    static final class StatefulSourceNodeBuilder<K, V> {
-
-        private String predecessorNodeName;
-        private String nodeName;
-        private String sourceName;
-        private String processorName;
-        private String topic;
-        private ConsumedInternal<K, V> consumedInternal;
-        private StateStoreSupplier<KeyValueStore> storeSupplier;
-        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
-        private ProcessorSupplier<K, V> processorSupplier;
-        private KTableSource<K, V> kTableSource;
-
-        private StatefulSourceNodeBuilder() {
-        }
-
-
-        StatefulSourceNodeBuilder<K, V> withPredecessorNodeName(final String predecessorNodeName) {
-            this.predecessorNodeName = predecessorNodeName;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withSourceName(final String sourceName) {
-            this.sourceName = sourceName;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withProcessorName(final String processorName) {
-            this.processorName = processorName;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withTopic(final String topic) {
-            this.topic = topic;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withStoreSupplier(final StateStoreSupplier<KeyValueStore> storeSupplier) {
-            this.storeSupplier = storeSupplier;
-            return this;
-        }
-
-
-        StatefulSourceNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
-            this.storeBuilder = storeBuilder;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
-            this.consumedInternal = consumedInternal;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withProcessorSupplier(final ProcessorSupplier<K, V> processorSupplier) {
-            this.processorSupplier = processorSupplier;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withKTableSource(final KTableSource<K, V> kTableSource) {
-            this.kTableSource = kTableSource;
-            return this;
-        }
-
-        StatefulSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
-            this.nodeName = nodeName;
-            return this;
-        }
-
-        StatefulSourceNode<K, V> build() {
-            StatefulSourceNode<K, V>
-                statefulSourceNode =
-                new StatefulSourceNode<>(predecessorNodeName,
-                                         nodeName,
-                                         sourceName,
-                                         processorName,
-                                         topic,
-                                         consumedInternal,
-                                         processorSupplier,
-                                         kTableSource);
-
-            statefulSourceNode.setRepartitionRequired(false);
-            if (storeSupplier != null) {
-                statefulSourceNode.setStoreSupplier(storeSupplier);
-            } else if (storeBuilder != null) {
-                statefulSourceNode.setStoreBuilder(storeBuilder);
-            }
-
-            return statefulSourceNode;
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
deleted file mode 100644
index 4597513..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-
-import java.util.Collection;
-import java.util.LinkedHashSet;
-
-abstract class StreamsGraphNode {
-
-    private StreamsGraphNode parentNode;
-    private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>();
-    private final String processorNodeName;
-    private String parentProcessorNodeName;
-    private boolean repartitionRequired;
-    private boolean triggersRepartitioning;
-    private Integer id;
-    private StreamsTopologyGraph streamsTopologyGraph;
-
-    StreamsGraphNode(final String parentProcessorNodeName,
-                     final String processorNodeName,
-                     final boolean repartitionRequired) {
-        this.parentProcessorNodeName = parentProcessorNodeName;
-        this.processorNodeName = processorNodeName;
-        this.repartitionRequired = repartitionRequired;
-    }
-
-    StreamsGraphNode parentNode() {
-        return parentNode;
-    }
-
-    String parentProcessorNodeName() {
-        return parentProcessorNodeName;
-    }
-
-    void setParentProcessorNodeName(final String parentProcessorNodeName) {
-        this.parentProcessorNodeName = parentProcessorNodeName;
-    }
-
-    void setParentNode(final StreamsGraphNode parentNode) {
-        this.parentNode = parentNode;
-    }
-
-    Collection<StreamsGraphNode> children() {
-        return new LinkedHashSet<>(childNodes);
-    }
-
-    void addChildNode(final StreamsGraphNode node) {
-        this.childNodes.add(node);
-    }
-
-    String processorNodeName() {
-        return processorNodeName;
-    }
-
-    boolean repartitionRequired() {
-        return repartitionRequired;
-    }
-
-    void setRepartitionRequired(boolean repartitionRequired) {
-        this.repartitionRequired = repartitionRequired;
-    }
-
-    public boolean triggersRepartitioning() {
-        return triggersRepartitioning;
-    }
-
-    public void setTriggersRepartitioning(final boolean triggersRepartitioning) {
-        this.triggersRepartitioning = triggersRepartitioning;
-    }
-
-    void setId(final int id) {
-        this.id = id;
-    }
-
-    Integer id() {
-        return this.id;
-    }
-
-    public void setStreamsTopologyGraph(final StreamsTopologyGraph streamsTopologyGraph) {
-        this.streamsTopologyGraph = streamsTopologyGraph;
-    }
-
-    StreamsTopologyGraph streamsTopologyGraph() {
-        return streamsTopologyGraph;
-    }
-
-    abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
deleted file mode 100644
index 9d2b90b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-class StreamsTopologyGraph {
-
-    private static final Logger LOG = LoggerFactory.getLogger(StreamsTopologyGraph.class);
-    public static final String TOPOLOGY_ROOT = "root";
-
-    protected final StreamsGraphNode root = new StreamsGraphNode(null, TOPOLOGY_ROOT, false) {
-        @Override
-        void writeToTopology(InternalTopologyBuilder topologyBuilder) {
-            // no-op for root node
-        }
-    };
-
-    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
-    private final Map<StreamsGraphNode, Set<StreamsGraphNode>> repartitioningNodeToRepartitioned = new HashMap<>();
-    private final Map<StreamsGraphNode, StreamSinkNode> stateStoreNodeToSinkNodes = new HashMap<>();
-    private final Map<String, StreamsGraphNode> nameToGraphNode = new HashMap<>();
-
-    private StreamsGraphNode previousNode;
-
-    StreamsTopologyGraph() {
-        nameToGraphNode.put(TOPOLOGY_ROOT, root);
-    }
-
-
-    public void addNode(final StreamsGraphNode node) {
-        node.setId(nodeIdCounter.getAndIncrement());
-
-        if (node.parentProcessorNodeName() == null && !node.processorNodeName().equals(TOPOLOGY_ROOT)) {
-            LOG.warn("Updating node {} with predecessor name {}", node, previousNode.processorNodeName());
-            node.setParentProcessorNodeName(previousNode.processorNodeName());
-        }
-
-        LOG.debug("Adding node {}", node);
-
-        final StreamsGraphNode predecessorNode =  nameToGraphNode.get(node.parentProcessorNodeName());
-
-        if (predecessorNode == null) {
-            throw new IllegalStateException(
-                "Nodes should not have a null predecessor.  Name: " + node.processorNodeName() + " Type: "
-                + node.getClass().getSimpleName() + " predecessor name " + node.parentProcessorNodeName());
-        }
-
-        node.setParentNode(predecessorNode);
-        predecessorNode.addChildNode(node);
-
-        if (node.triggersRepartitioning()) {
-            repartitioningNodeToRepartitioned.put(node, new HashSet<StreamsGraphNode>());
-        } else if (node.repartitionRequired()) {
-            StreamsGraphNode currentNode = node;
-            while (currentNode != null) {
-                final StreamsGraphNode parentNode = currentNode.parentNode();
-                if (parentNode.triggersRepartitioning()) {
-                    repartitioningNodeToRepartitioned.get(parentNode).add(node);
-                    break;
-                }
-                currentNode = parentNode.parentNode();
-            }
-        }
-
-        if (!nameToGraphNode.containsKey(node.processorNodeName())) {
-            nameToGraphNode.put(node.processorNodeName(), node);
-        }
-
-        previousNode = node;
-    }
-
-    public StreamsGraphNode getRoot() {
-        return root;
-    }
-
-    /**
-     * Used for hints when a node in the topology triggers a repartition and the repartition flag
-     * is propagated down through the descendant nodes of the topology.  This can be used to help make an
-     * optimization where the triggering node does an eager "through" operation and the child nodes can ignore
-     * the need to repartition.
-     *
-     * @return Map&lt;StreamGraphNode, Set&lt;StreamGraphNode&gt;&gt;
-     */
-    public Map<StreamsGraphNode, Set<StreamsGraphNode>> getRepartitioningNodeToRepartitioned() {
-        Map<StreamsGraphNode, Set<StreamsGraphNode>> copy = new HashMap<>(repartitioningNodeToRepartitioned);
-        return Collections.unmodifiableMap(copy);
-    }
-
-    /**
-     * Used for hints when an Aggregation operation is directly output to a Sink topic.
-     * This map can be used to help optimize this case and use the Sink topic as the changelog topic
-     * for the state store of the aggregation.
-     *
-     * @return Map&lt;StreamGraphNode, StreamSinkNode&gt;
-     */
-    public Map<StreamsGraphNode, StreamSinkNode> getStateStoreNodeToSinkNodes() {
-        Map<StreamsGraphNode, StreamSinkNode> copy = new HashMap<>(stateStoreNodeToSinkNodes);
-        return Collections.unmodifiableMap(copy);
-    }
-
-    /**
-     * Used for tracking the Streams generated names back to the original StreamGraphNode
-     * to enable the predecessor - descendant relationship
-     *
-     * @return Map&lt;String, SteamsGraphNode&gt;
-     */
-    public Map<String, StreamsGraphNode> getNameToGraphNode() {
-        Map<String, StreamsGraphNode> copy = new HashMap<>(nameToGraphNode);
-        return Collections.unmodifiableMap(copy);
-    }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 4f5301b..e545f48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -52,13 +53,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                             final String name,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde,
-                            final boolean repartitionRequired) {
-        super(builder, name, sourceNodes);
+                            final boolean repartitionRequired,
+                            final StreamsGraphNode streamsGraphNode) {
+        super(builder, name, sourceNodes, streamsGraphNode);
         Objects.requireNonNull(windows, "windows can't be null");
         this.valSerde = valSerde;
         this.keySerde = keySerde;
         this.windows = windows;
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
new file mode 100644
index 0000000..35805d3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+/**
+ * Utility base class containing the common fields between
+ * a Stream-Stream join and a Table-Table join
+ */
+abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode {
+
+    private final ProcessorParameters<K, V1> joinThisProcessorParameters;
+    private final ProcessorParameters<K, V2> joinOtherProcessorParameters;
+    private final ProcessorParameters<K, VR> joinMergeProcessorParameters;
+    private final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+    private final String thisJoinSideNodeName;
+    private final String otherJoinSideNodeName;
+
+
+    BaseJoinProcessorNode(final String nodeName,
+                          final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
+                          final ProcessorParameters<K, V1> joinThisProcessorParameters,
+                          final ProcessorParameters<K, V2> joinOtherProcessorParameters,
+                          final ProcessorParameters<K, VR> joinMergeProcessorParameters,
+                          final String thisJoinSideNodeName,
+                          final String otherJoinSideNodeName) {
+
+        super(nodeName,
+              false);
+
+        this.valueJoiner = valueJoiner;
+        this.joinThisProcessorParameters = joinThisProcessorParameters;
+        this.joinOtherProcessorParameters = joinOtherProcessorParameters;
+        this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+        this.thisJoinSideNodeName = thisJoinSideNodeName;
+        this.otherJoinSideNodeName = otherJoinSideNodeName;
+    }
+
+    ProcessorParameters<K, V1> thisProcessorParameters() {
+        return joinThisProcessorParameters;
+    }
+
+    ProcessorParameters<K, V2> otherProcessorParameters() {
+        return joinOtherProcessorParameters;
+    }
+
+    ProcessorParameters<K, VR> mergeProcessorParameters() {
+        return joinMergeProcessorParameters;
+    }
+
+    ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner() {
+        return valueJoiner;
+    }
+
+    String thisJoinSideNodeName() {
+        return thisJoinSideNodeName;
+    }
+
+    String otherJoinSideNodeName() {
+        return otherJoinSideNodeName;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
new file mode 100644
index 0000000..73a7f11
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode {
+
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valueSerde;
+    protected final String sinkName;
+    protected final String sourceName;
+    protected final String repartitionTopic;
+    protected final ProcessorParameters processorParameters;
+
+
+    BaseRepartitionNode(final String nodeName,
+                        final String sourceName,
+                        final ProcessorParameters processorParameters,
+                        final Serde<K> keySerde,
+                        final Serde<V> valueSerde,
+                        final String sinkName,
+                        final String repartitionTopic) {
+
+        super(nodeName, false);
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.sinkName = sinkName;
+        this.sourceName = sourceName;
+        this.repartitionTopic = repartitionTopic;
+        this.processorParameters = processorParameters;
+    }
+
+    abstract Serializer<V> getValueSerializer();
+
+    abstract Deserializer<V> getValueDeserializer();
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
new file mode 100644
index 0000000..0a70a43
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
+import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionNode {
+
+
+    GroupedTableOperationRepartitionNode(final String nodeName,
+                                         final Serde<K> keySerde,
+                                         final Serde<V> valueSerde,
+                                         final String sinkName,
+                                         final String sourceName,
+                                         final String repartitionTopic,
+                                         final ProcessorParameters processorParameters) {
+
+        super(nodeName,
+              sourceName,
+              processorParameters,
+              keySerde,
+              valueSerde,
+              sinkName,
+              repartitionTopic
+        );
+    }
+
+    @Override
+    Serializer getValueSerializer() {
+        final Serializer<? extends V> valueSerializer = valueSerde == null ? null : valueSerde.serializer();
+        return new ChangedSerializer<>(valueSerializer);
+    }
+
+    @Override
+    Deserializer getValueDeserializer() {
+        final Deserializer<? extends V> valueDeserializer = valueSerde == null ? null : valueSerde.deserializer();
+        return new ChangedDeserializer<>(valueDeserializer);
+    }
+
+
+    @Override
+    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+        final Serializer<K> keySerializer = keySerde != null ? keySerde.serializer() : null;
+        final Deserializer<K> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
+
+
+        topologyBuilder.addInternalTopic(repartitionTopic);
+
+        topologyBuilder.addSink(sinkName,
+                                repartitionTopic,
+                                keySerializer,
+                                getValueSerializer(),
+                                null,
+                                parentNode().nodeName());
+
+        topologyBuilder.addSource(null,
+                                  sourceName,
+                                  new FailOnInvalidTimestamp(),
+                                  keyDeserializer,
+                                  getValueDeserializer(),
+                                  repartitionTopic);
+
+    }
+
+    public static GroupedTableOperationRepartitionNodeBuilder groupedTableOperationNodeBuilder() {
+        return new GroupedTableOperationRepartitionNodeBuilder();
+    }
+
+
+    public static final class GroupedTableOperationRepartitionNodeBuilder<K, V> {
+
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String nodeName;
+        private String sourceName;
+        private String repartitionTopic;
+        private ProcessorParameters processorParameters;
+
+        private GroupedTableOperationRepartitionNodeBuilder() {
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withKeySerde(Serde<K> keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withValueSerde(Serde<V> valueSerde) {
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSinkName(String sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withNodeName(String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withSourceName(String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withRepartitionTopic(String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNodeBuilder<K, V> withProcessorParameters(ProcessorParameters processorParameters) {
+            this.processorParameters = processorParameters;
+            return this;
+        }
+
+        public GroupedTableOperationRepartitionNode<K, V> build() {
+            return new GroupedTableOperationRepartitionNode(nodeName,
+                                                            keySerde,
+                                                            valueSerde,
+                                                            sinkName,
+                                                            sourceName,
+                                                            repartitionTopic,
+                                                            processorParameters
+            );
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
similarity index 62%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index f76aa0d..5e3d8d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -15,35 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
-import java.util.Arrays;
-
 /**
  * Too much specific information to generalize so the
  * KTable-KTable join requires a specific node.
  */
-class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
+public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
 
     private final String[] joinThisStoreNames;
     private final String[] joinOtherStoreNames;
+    private final MaterializedInternal materializedInternal;
 
-    KTableKTableJoinNode(final String parentProcessorNodeName,
-                         final String processorNodeName,
+    KTableKTableJoinNode(final String nodeName,
                          final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
                          final ProcessorParameters<K, V1> joinThisProcessorParameters,
                          final ProcessorParameters<K, V2> joinOtherProcessorParameters,
                          final ProcessorParameters<K, VR> joinMergeProcessorParameters,
+                         final MaterializedInternal materializedInternal,
                          final String thisJoinSide,
                          final String otherJoinSide,
                          final String[] joinThisStoreNames,
                          final String[] joinOtherStoreNames) {
 
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               valueJoiner,
               joinThisProcessorParameters,
               joinOtherProcessorParameters,
@@ -53,32 +52,25 @@ class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
 
         this.joinThisStoreNames = joinThisStoreNames;
         this.joinOtherStoreNames = joinOtherStoreNames;
-    }
-
-    String[] joinThisStoreNames() {
-        return Arrays.copyOf(joinThisStoreNames, joinThisStoreNames.length);
-    }
-
-    String[] joinOtherStoreNames() {
-        return Arrays.copyOf(joinOtherStoreNames, joinOtherStoreNames.length);
+        this.materializedInternal = materializedInternal;
     }
 
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 
-    static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
+    public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
         return new KTableKTableJoinNodeBuilder<>();
     }
 
-    static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
+    public static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
 
-        private String processorNodeName;
-        private String parentProcessorNodeName;
+        private String nodeName;
         private String[] joinThisStoreNames;
         private ProcessorParameters<K, V1> joinThisProcessorParameters;
         private String[] joinOtherStoreNames;
+        private MaterializedInternal materializedInternal;
         private ProcessorParameters<K, V2> joinOtherProcessorParameters;
         private ProcessorParameters<K, VR> joinMergeProcessorParameters;
         private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
@@ -88,64 +80,64 @@ class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
         private KTableKTableJoinNodeBuilder() {
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisStoreNames(final String[] joinThisStoreNames) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisStoreNames(final String[] joinThisStoreNames) {
             this.joinThisStoreNames = joinThisStoreNames;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
             this.joinThisProcessorParameters = joinThisProcessorParameters;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR> withProcessorNodeName(String processorNodeName) {
-            this.processorNodeName = processorNodeName;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(String nodeName) {
+            this.nodeName = nodeName;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
             this.joinOtherStoreNames = joinOtherStoreNames;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR> withParentProcessorNodeName(final String parentProcessorNodeName) {
-            this.parentProcessorNodeName = parentProcessorNodeName;
-            return this;
-        }
-
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessorParameters) {
             this.joinOtherProcessorParameters = joinOtherProcessorParameters;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
             this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
             this.valueJoiner = valueJoiner;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withThisJoinSide(final String thisJoinSide) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withThisJoinSide(final String thisJoinSide) {
             this.thisJoinSide = thisJoinSide;
             return this;
         }
 
-        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withOtherJoinSide(final String otherJoinSide) {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withOtherJoinSide(final String otherJoinSide) {
             this.otherJoinSide = otherJoinSide;
             return this;
         }
 
-        KTableKTableJoinNode<K, V1, V2, VR> build() {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withMaterializedInternal(final MaterializedInternal materializedInternal) {
+            this.materializedInternal = materializedInternal;
+            return this;
+        }
+
+        public KTableKTableJoinNode<K, V1, V2, VR> build() {
 
-            return new KTableKTableJoinNode<>(parentProcessorNodeName,
-                                              processorNodeName,
+            return new KTableKTableJoinNode<>(nodeName,
                                               valueJoiner,
                                               joinThisProcessorParameters,
                                               joinOtherProcessorParameters,
                                               joinMergeProcessorParameters,
+                                              materializedInternal,
                                               thisJoinSide,
                                               otherJoinSide,
                                               joinThisStoreNames,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
new file mode 100644
index 0000000..8b1ce6d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
+
+    OptimizableRepartitionNode(final String nodeName,
+                               final String sourceName,
+                               final ProcessorParameters processorParameters,
+                               final Serde<K> keySerde,
+                               final Serde<V> valueSerde,
+                               final String sinkName,
+                               final String repartitionTopic) {
+
+        super(nodeName,
+              sourceName,
+              processorParameters,
+              keySerde,
+              valueSerde,
+              sinkName,
+              repartitionTopic);
+
+    }
+
+    @Override
+    Serializer<V> getValueSerializer() {
+        return valueSerde != null ? valueSerde.serializer() : null;
+    }
+
+    @Override
+    Deserializer<V> getValueDeserializer() {
+        return  valueSerde != null ? valueSerde.deserializer() : null;
+    }
+
+
+    @Override
+    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+        final Serializer<K> keySerializer = keySerde != null ? keySerde.serializer() : null;
+        final Deserializer<K> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
+
+
+
+        topologyBuilder.addInternalTopic(repartitionTopic);
+
+        topologyBuilder.addProcessor(processorParameters.processorName(),
+                                     processorParameters.processorSupplier(),
+                                     parentNode().nodeName());
+
+        topologyBuilder.addSink(sinkName,
+                                repartitionTopic,
+                                keySerializer,
+                                getValueSerializer(),
+                                null,
+                                processorParameters.processorName());
+
+        topologyBuilder.addSource(null,
+                                  sourceName,
+                                  new FailOnInvalidTimestamp(),
+                                  keyDeserializer,
+                                  getValueDeserializer(),
+                                  repartitionTopic);
+
+    }
+
+    public static <K, V> OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder() {
+        return new OptimizableRepartitionNodeBuilder<>();
+    }
+
+
+    public static final class OptimizableRepartitionNodeBuilder<K, V> {
+
+        private String nodeName;
+        private ProcessorParameters processorParameters;
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String sourceName;
+        private String repartitionTopic;
+
+        private OptimizableRepartitionNodeBuilder() {
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+            this.processorParameters = processorParameters;
+            return this;
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withKeySerde(final Serde<K> keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withValueSerde(final Serde<V> valueSerde) {
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withSinkName(final String sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        public OptimizableRepartitionNodeBuilder<K, V> withRepartitionTopic(final String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+
+        public OptimizableRepartitionNodeBuilder<K, V> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        public OptimizableRepartitionNode<K, V> build() {
+
+            return new OptimizableRepartitionNode<>(nodeName,
+                                                    sourceName,
+                                                    processorParameters,
+                                                    keySerde,
+                                                    valueSerde,
+                                                    sinkName,
+                                                    repartitionTopic
+            );
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
similarity index 82%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index cab1589..bca7571 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
@@ -26,21 +26,21 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
  * Used by the Join nodes as there are several parameters, this abstraction helps
  * keep the number of arguments more reasonable.
  */
-class ProcessorParameters<K, V> {
+public class ProcessorParameters<K, V> {
 
     private final ProcessorSupplier<K, V> processorSupplier;
     private final String processorName;
 
-    ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final String processorName) {
+    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final String processorName) {
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<K, V> processorSupplier() {
         return processorSupplier;
     }
 
-    String processorName() {
+    public String processorName() {
         return processorName;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
new file mode 100644
index 0000000..0204d51
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Arrays;
+
+public class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
+
+    private final String[] storeNames;
+    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private final String maybeRepartitionedSourceName;
+
+
+    public StatefulProcessorNode(final String nodeName,
+                                 final ProcessorParameters processorParameters,
+                                 final String[] storeNames,
+                                 final String maybeRepartitionedSourceName,
+                                 final StoreBuilder<KeyValueStore<K, V>> materializedKTableStoreBuilder,
+                                 final boolean repartitionRequired) {
+        super(nodeName,
+              processorParameters,
+              repartitionRequired);
+
+        this.storeNames = storeNames;
+        this.storeBuilder = materializedKTableStoreBuilder;
+        this.maybeRepartitionedSourceName = maybeRepartitionedSourceName;
+    }
+
+
+    String[] storeNames() {
+        return Arrays.copyOf(storeNames, storeNames.length);
+    }
+
+
+    @Override
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    public static <K, V> StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder() {
+        return new StatefulProcessorNodeBuilder<>();
+    }
+
+    public static final class StatefulProcessorNodeBuilder<K, V> {
+
+        private ProcessorParameters processorSupplier;
+        private String nodeName;
+        private boolean repartitionRequired;
+        private String maybeRepartitionedSourceName;
+        private String[] storeNames;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+
+        private StatefulProcessorNodeBuilder() {
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) {
+            this.processorSupplier = processorParameters;
+            return this;
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] storeNames) {
+            this.storeNames = storeNames;
+            return this;
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final boolean repartitionRequired) {
+            this.repartitionRequired = repartitionRequired;
+            return this;
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        public StatefulProcessorNodeBuilder<K, V> withMaybeRepartitionedSourceName(String maybeRepartitionedSourceName) {
+            this.maybeRepartitionedSourceName = maybeRepartitionedSourceName;
+            return this;
+        }
+
+        public StatefulProcessorNode<K, V> build() {
+            return new StatefulProcessorNode<>(nodeName,
+                                               processorSupplier,
+                                               storeNames,
+                                               maybeRepartitionedSourceName,
+                                               storeBuilder,
+                                               repartitionRequired);
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java
similarity index 65%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java
index eecb068..b985f92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatelessProcessorNode.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.ArrayList;
@@ -29,9 +28,9 @@ import java.util.List;
  * map, mapValues, flatMap, flatMapValues, filter, filterNot, branch
  *
  */
-class StatelessProcessorNode<K, V> extends StreamsGraphNode {
+public class StatelessProcessorNode<K, V> extends StreamsGraphNode {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    private final ProcessorParameters<K, V> processorParameters;
 
     // some processors need to register multiple parent names with
     // the InternalTopologyBuilder KStream#merge for example.
@@ -41,31 +40,28 @@ class StatelessProcessorNode<K, V> extends StreamsGraphNode {
     private List<String> multipleParentNames = new ArrayList<>();
 
 
-    StatelessProcessorNode(final String parentProcessorNodeName,
-                           final String processorNodeName,
-                           final ProcessorSupplier<K, V> processorSupplier,
+    public StatelessProcessorNode(final String nodeName,
+                           final ProcessorParameters processorParameters,
                            final boolean repartitionRequired) {
 
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               repartitionRequired);
 
-        this.processorSupplier = processorSupplier;
+        this.processorParameters = processorParameters;
     }
 
-    StatelessProcessorNode(final String parentProcessorNodeName,
-                           final String processorNodeName,
+    public StatelessProcessorNode(final String nodeName,
+                           final ProcessorParameters processorParameters,
                            final boolean repartitionRequired,
-                           final ProcessorSupplier<K, V> processorSupplier,
                            final List<String> multipleParentNames) {
 
-        this(parentProcessorNodeName, processorNodeName, processorSupplier, repartitionRequired);
+        this(nodeName, processorParameters, repartitionRequired);
 
         this.multipleParentNames = multipleParentNames;
     }
 
-    ProcessorSupplier<K, V> processorSupplier() {
-        return processorSupplier;
+    ProcessorParameters<K, V> processorSupplier() {
+        return processorParameters;
     }
 
     List<String> multipleParentNames() {
@@ -73,7 +69,7 @@ class StatelessProcessorNode<K, V> extends StreamsGraphNode {
     }
 
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
similarity index 68%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 0a65d1b..36dbefe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -15,33 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.ProducedInternal;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 
-class StreamSinkNode<K, V> extends StreamsGraphNode {
+public class StreamSinkNode<K, V> extends StreamsGraphNode {
 
-    private final String topic;
+    private final TopicNameExtractor<K, V> topicNameExtractor;
     private final ProducedInternal<K, V> producedInternal;
 
-    StreamSinkNode(final String parentProcessorNodeName,
-                   final String processorNodeName,
-                   final String topic,
+    public StreamSinkNode(final String nodeName,
+                   final TopicNameExtractor<K, V> topicNameExtractor,
                    final ProducedInternal<K, V> producedInternal) {
 
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               false);
 
-        this.topic = topic;
+        this.topicNameExtractor = topicNameExtractor;
         this.producedInternal = producedInternal;
     }
 
     String topic() {
-        return topic;
+        return topicNameExtractor instanceof StaticTopicNameExtractor ? ((StaticTopicNameExtractor) topicNameExtractor).topicName : null;
+    }
+
+    TopicNameExtractor<K, V> topicNameExtractor() {
+        return topicNameExtractor;
     }
 
     Serde<K> keySerde() {
@@ -65,7 +70,7 @@ class StreamSinkNode<K, V> extends StreamsGraphNode {
     }
 
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
similarity index 51%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index cb86840..6257ee8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -15,77 +15,56 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.regex.Pattern;
 
-class StreamSourceNode<K, V> extends StreamsGraphNode {
+public class StreamSourceNode<K, V> extends StreamsGraphNode {
 
-    private Collection<String> topics;
+    private Collection<String> topicNames;
     private Pattern topicPattern;
     private final ConsumedInternal<K, V> consumedInternal;
 
 
-    StreamSourceNode(final String parentProcessorNodeName,
-                     final String processorNodeName,
-                     final Collection<String> topics,
+    public StreamSourceNode(final String nodeName,
+                     final Collection<String> topicNames,
                      final ConsumedInternal<K, V> consumedInternal) {
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               false);
 
-        this.topics = topics;
+        this.topicNames = topicNames;
         this.consumedInternal = consumedInternal;
     }
 
-    StreamSourceNode(final String parentProcessorNodeName,
-                     final String processorNodeName,
+    public StreamSourceNode(final String nodeName,
                      final Pattern topicPattern,
                      final ConsumedInternal<K, V> consumedInternal) {
 
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               false);
 
         this.topicPattern = topicPattern;
         this.consumedInternal = consumedInternal;
     }
 
-    List<String> getTopics() {
-        return new ArrayList<>(topics);
+    public Collection<String> getTopicNames() {
+        return topicNames;
     }
 
-    Pattern getTopicPattern() {
+    public Pattern getTopicPattern() {
         return topicPattern;
     }
 
-    Serde<K> keySerde() {
-        return consumedInternal.keySerde();
-    }
-
-    Deserializer<K> keyDeserializer() {
-        return consumedInternal.keySerde() != null ? consumedInternal.keySerde().deserializer() : null;
-    }
-
-    TimestampExtractor timestampExtractor() {
-        return consumedInternal.timestampExtractor();
-    }
-
-    Topology.AutoOffsetReset autoOffsetReset() {
-        return consumedInternal.offsetResetPolicy();
+    public ConsumedInternal<K, V> getConsumedInternal() {
+        return consumedInternal;
     }
 
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
similarity index 60%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 90734eb..ace2164 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
@@ -28,20 +27,16 @@ import org.apache.kafka.streams.state.WindowStore;
  * Too much information to generalize, so Stream-Stream joins are
  * represented by a specific node.
  */
-class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
+public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
 
-    private final ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier;
-    private final ProcessorSupplier<K, V2> otherWindowedStreamProcessorSupplier;
+    private final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters;
+    private final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters;
     private final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
     private final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
     private final Joined<K, V1, V2> joined;
 
-    private final String thisWindowedStreamName;
-    private final String otherWindowedStreamName;
 
-
-    StreamStreamJoinNode(final String parentProcessorNodeName,
-                         final String processorNodeName,
+    StreamStreamJoinNode(final String nodeName,
                          final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
                          final ProcessorParameters<K, V1> joinThisProcessorParameters,
                          final ProcessorParameters<K, V2> joinOtherProcessParameters,
@@ -54,8 +49,7 @@ class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
                          final String leftHandSideStreamName,
                          final String otherStreamName) {
 
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               valueJoiner,
               joinThisProcessorParameters,
               joinOtherProcessParameters,
@@ -63,29 +57,19 @@ class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
               leftHandSideStreamName,
               otherStreamName);
 
-        this.thisWindowedStreamProcessorSupplier = thisWindowedStreamProcessorParameters.processorSupplier();
-        this.otherWindowedStreamProcessorSupplier = otherWindowedStreamProcessorParameters.processorSupplier();
-        this.thisWindowedStreamName = thisWindowedStreamProcessorParameters.processorName();
-        this.otherWindowedStreamName = otherWindowedStreamProcessorParameters.processorName();
+        this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
+        this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
         this.thisWindowStoreBuilder = thisWindowStoreBuilder;
         this.otherWindowStoreBuilder = otherWindowStoreBuilder;
         this.joined = joined;
     }
 
-    ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier() {
-        return thisWindowedStreamProcessorSupplier;
-    }
-
-    ProcessorSupplier<K, V2> otherWindowedStreamProcessorSupplier() {
-        return otherWindowedStreamProcessorSupplier;
-    }
-
-    String thisWindowedStreamName() {
-        return thisWindowedStreamName;
+    ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters() {
+        return thisWindowedStreamProcessorParameters;
     }
 
-    String otherWindowedStreamName() {
-        return otherWindowedStreamName;
+    ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters() {
+        return otherWindowedStreamProcessorParameters;
     }
 
     StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder() {
@@ -96,19 +80,22 @@ class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
         return otherWindowStoreBuilder;
     }
 
+    Joined<K, V1, V2> joined() {
+        return joined;
+    }
+
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 
-    static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
+    public static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
         return new StreamStreamJoinNodeBuilder<>();
     }
 
-    static final class StreamStreamJoinNodeBuilder<K, V1, V2, VR> {
+    public static final class StreamStreamJoinNodeBuilder<K, V1, V2, VR> {
 
-        private String processorNodeName;
-        private String parentProcessorNodeName;
+        private String nodeName;
         private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
         private ProcessorParameters<K, V1> joinThisProcessorParameters;
         private ProcessorParameters<K, V2> joinOtherProcessorParameters;
@@ -126,75 +113,69 @@ class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V
         }
 
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
             this.valueJoiner = valueJoiner;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinThisProcessorParameters(final ProcessorParameters<K, V1> joinThisProcessorParameters) {
             this.joinThisProcessorParameters = joinThisProcessorParameters;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters) {
             this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withProcessorNodeName(final String name) {
-            this.processorNodeName = name;
-            return this;
-        }
-
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withParentProcessorNodeName(final String predecessorNodeName) {
-            this.parentProcessorNodeName = predecessorNodeName;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessParameters) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> joinOtherProcessParameters) {
             this.joinOtherProcessorParameters = joinOtherProcessParameters;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowedStreamProcessorParameters(final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowedStreamProcessorParameters(final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters) {
             this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> joinMergeProcessorParameters) {
             this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withLeftHandSideStreamName(final String leftHandSideStreamName) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withLeftHandSideStreamName(final String leftHandSideStreamName) {
             this.leftHandSideStreamName = leftHandSideStreamName;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherStreamName(final String otherStreamName) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherStreamName(final String otherStreamName) {
             this.otherStreamName = otherStreamName;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowStoreBuilder(final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowStoreBuilder(final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder) {
             this.thisWindowStoreBuilder = thisWindowStoreBuilder;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowStoreBuilder(final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowStoreBuilder(final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder) {
             this.otherWindowStoreBuilder = otherWindowStoreBuilder;
             return this;
         }
 
-        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final Joined<K, V1, V2> joined) {
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final Joined<K, V1, V2> joined) {
             this.joined = joined;
             return this;
         }
 
-        StreamStreamJoinNode<K, V1, V2, VR> build() {
+        public StreamStreamJoinNode<K, V1, V2, VR> build() {
 
-            return new StreamStreamJoinNode<>(parentProcessorNodeName,
-                                              processorNodeName,
+            return new StreamStreamJoinNode<>(nodeName,
                                               valueJoiner,
                                               joinThisProcessorParameters,
                                               joinOtherProcessorParameters,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
similarity index 63%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index 18f2055..64ae441 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.Arrays;
@@ -26,34 +25,41 @@ import java.util.Arrays;
  * Represents a join between a KStream and a KTable or GlobalKTable
  */
 
-class StreamTableJoinNode<K1, K2, V1, V2, VR> extends StreamsGraphNode {
+public class StreamTableJoinNode<K, V> extends StreamsGraphNode {
 
     private final String[] storeNames;
-    private final ProcessorSupplier<K1, V1> processorSupplier;
+    private final ProcessorParameters<K, V> processorParameters;
+    private boolean isGlobalKTableJoin;
 
-    StreamTableJoinNode(final String parentProcessorNodeName,
-                        final String processorNodeName,
-                        final ProcessorSupplier<K1, V1> processorSupplier,
+    public StreamTableJoinNode(final String nodeName,
+                        final ProcessorParameters<K, V> processorParameters,
                         final String[] storeNames) {
-        super(parentProcessorNodeName,
-              processorNodeName,
+        super(nodeName,
               false);
 
         // in the case of Stream-Table join the state stores associated with the KTable
         this.storeNames = storeNames;
-        this.processorSupplier = processorSupplier;
+        this.processorParameters = processorParameters;
     }
 
     String[] storeNames() {
         return Arrays.copyOf(storeNames, storeNames.length);
     }
 
-    ProcessorSupplier<K1, V1> processorSupplier() {
-        return processorSupplier;
+    ProcessorParameters<K, V> processorParameters() {
+        return processorParameters;
+    }
+
+    public void setGlobalKTableJoin(boolean globalKTableJoin) {
+        isGlobalKTableJoin = globalKTableJoin;
+    }
+
+    boolean isGlobalKTableJoin() {
+        return isGlobalKTableJoin;
     }
 
     @Override
-    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
         //TODO will implement in follow-up pr
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
new file mode 100644
index 0000000..23b6c34
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+
+public abstract class StreamsGraphNode {
+
+    private StreamsGraphNode parentNode;
+    private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>();
+    private final String nodeName;
+    private boolean repartitionRequired;
+    private boolean keyChangingOperation;
+    private Integer id;
+    private InternalStreamsBuilder internalStreamsBuilder;
+
+    public StreamsGraphNode(final String nodeName,
+                     final boolean repartitionRequired) {
+        this.nodeName = nodeName;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    public StreamsGraphNode parentNode() {
+        return parentNode;
+    }
+
+    public void setParentNode(final StreamsGraphNode parentNode) {
+        this.parentNode = parentNode;
+    }
+
+    public Collection<StreamsGraphNode> children() {
+        return new LinkedHashSet<>(childNodes);
+    }
+
+    public void addChildNode(final StreamsGraphNode childNode) {
+        this.childNodes.add(childNode);
+        childNode.setParentNode(this);
+    }
+
+    public String nodeName() {
+        return nodeName;
+    }
+
+    public boolean repartitionRequired() {
+        return repartitionRequired;
+    }
+
+    public void setRepartitionRequired(boolean repartitionRequired) {
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    public boolean isKeyChangingOperation() {
+        return keyChangingOperation;
+    }
+
+    public void keyChangingOperation(final boolean keyChangingOperation) {
+        this.keyChangingOperation = keyChangingOperation;
+    }
+
+    public void setId(final int id) {
+        this.id = id;
+    }
+
+    public Integer id() {
+        return this.id;
+    }
+
+    public void setInternalStreamsBuilder(final InternalStreamsBuilder internalStreamsBuilder) {
+        this.internalStreamsBuilder = internalStreamsBuilder;
+    }
+
+    public InternalStreamsBuilder internalStreamsBuilder() {
+        return internalStreamsBuilder;
+    }
+
+    public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
new file mode 100644
index 0000000..734b505
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraphNode {
+
+    private final MaterializedInternal<K, V, S> materializedInternal;
+    private final ProcessorParameters<K, V> processorParameters;
+    private final String[] storeNames;
+
+    public TableProcessorNode(final String nodeName,
+                              final ProcessorParameters<K, V> processorParameters,
+                              final MaterializedInternal<K, V, S> materializedInternal,
+                              final String[] storeNames) {
+
+        super(nodeName,
+              false);
+        this.processorParameters = processorParameters;
+        this.materializedInternal = materializedInternal;
+        this.storeNames = storeNames != null ? storeNames : new String[]{};
+    }
+
+
+    @Override
+    public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
new file mode 100644
index 0000000..d194a95
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Collections;
+
+/**
+ * Used to represent either a KTable source or a GlobalKTable source. A boolean flag is used to indicate if this represents a GlobalKTable a {@link
+ * org.apache.kafka.streams.kstream.GlobalKTable}
+ */
+public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
+
+    private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private final ProcessorParameters<K, V> processorParameters;
+    private final String sourceName;
+    private final boolean isGlobalKTable;
+
+    TableSourceNode(final String nodeName,
+                    final String sourceName,
+                    final String topic,
+                    final ConsumedInternal<K, V> consumedInternal,
+                    final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+                    final ProcessorParameters<K, V> processorParameters,
+                    final boolean isGlobalKTable) {
+
+        super(nodeName,
+              Collections.singletonList(topic),
+              consumedInternal);
+
+        this.processorParameters = processorParameters;
+        this.sourceName = sourceName;
+        this.isGlobalKTable = isGlobalKTable;
+        this.storeBuilder = storeBuilder;
+    }
+
+    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
+        return storeBuilder;
+    }
+
+    ProcessorParameters<K, V> processorParameters() {
+        return processorParameters;
+    }
+
+    String sourceName() {
+        return sourceName;
+    }
+
+    boolean isGlobalKTable() {
+        return isGlobalKTable;
+    }
+
+    public static <K, V> TableSourceNodeBuilder<K, V> tableSourceNodeBuilder() {
+        return new TableSourceNodeBuilder<>();
+    }
+
+    @Override
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    public static final class TableSourceNodeBuilder<K, V> {
+
+        private String nodeName;
+        private String sourceName;
+        private String topic;
+        private ConsumedInternal<K, V> consumedInternal;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        private ProcessorParameters<K, V> processorParameters;
+        private boolean isGlobalKTable = false;
+
+        private TableSourceNodeBuilder() {
+        }
+
+        public TableSourceNodeBuilder<K, V> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> withTopic(final String topic) {
+            this.topic = topic;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> withStoreBuilder(final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
+            this.consumedInternal = consumedInternal;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
+            this.processorParameters = processorParameters;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        public TableSourceNodeBuilder<K, V> isGlobalKTable(final boolean isGlobaKTable) {
+            this.isGlobalKTable = isGlobaKTable;
+            return this;
+        }
+
+        public TableSourceNode<K, V> build() {
+            return new TableSourceNode<>(nodeName,
+                                         sourceName,
+                                         topic,
+                                         consumedInternal,
+                                         storeBuilder,
+                                         processorParameters,
+                                         isGlobalKTable);
+
+
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
deleted file mode 100644
index 536b194..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import java.util.Map;
-
-/**
- * A state store supplier which can create one or more {@link StateStore} instances.
- *
- * @param <T> State store type
- * @deprecated use {@link org.apache.kafka.streams.state.StoreSupplier}
- */
-@Deprecated
-public interface StateStoreSupplier<T extends StateStore> {
-
-    /**
-     * Return the name of this state store supplier.
-     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'
-     *
-     * @return the name of this state store supplier
-     */
-    String name();
-
-    /**
-     * Return a new {@link StateStore} instance.
-     *
-     * @return a new {@link StateStore} instance of type T
-     */
-    T get();
-
-    /**
-     * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}
-     * <p>
-     * Note: any unrecognized configs will be ignored by the Kafka brokers.
-     *
-     * @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
-     * If {@code loggingEnabled} returns false, this function will always return an empty map
-     */
-    Map<String, String> logConfig();
-
-    /**
-     * @return true if the {@link StateStore} should have logging enabled
-     */
-    boolean loggingEnabled();
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index a0cda1b..f30d9d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -102,8 +102,8 @@ public class AbstractStreamTest {
 
         KStream<K, V> randomFilter() {
             String name = builder.newProcessorName("RANDOM-FILTER-");
-            internalTopologyBuilder().addProcessor(name, new ExtendedKStreamDummy(), this.name);
-            return new KStreamImpl<>(builder, name, sourceNodes, false);
+            builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
+            return new KStreamImpl<>(builder, name, sourceNodes, false, null);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message