kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Date Mon, 31 Jul 2017 22:29:08 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ba537c0..d357356 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,17 +20,16 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -57,31 +56,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
 
-    public static final String FILTER_NAME = "KSTREAM-FILTER-";
+    private static final String FILTER_NAME = "KSTREAM-FILTER-";
 
-    public static final String PEEK_NAME = "KSTREAM-PEEK-";
+    private static final String PEEK_NAME = "KSTREAM-PEEK-";
 
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
     private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
 
-    public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
+    private static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
 
-    public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+    private static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
 
-    public static final String JOIN_NAME = "KSTREAM-JOIN-";
+    private static final String JOIN_NAME = "KSTREAM-JOIN-";
 
-    public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+    private static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
 
     private static final String MAP_NAME = "KSTREAM-MAP-";
 
     private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
 
-    public static final String MERGE_NAME = "KSTREAM-MERGE-";
+    private static final String MERGE_NAME = "KSTREAM-MERGE-";
 
-    public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+    private static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
 
-    public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+    private static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
 
     private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
 
@@ -89,7 +88,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
-    public static final String SINK_NAME = "KSTREAM-SINK-";
+    static final String SINK_NAME = "KSTREAM-SINK-";
 
     public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
@@ -101,15 +100,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
 
-    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
 
     private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
 
     private final boolean repartitionRequired;
 
-    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes,
-                       boolean repartitionRequired) {
-        super(topology, name, sourceNodes);
+    public KStreamImpl(final InternalStreamsBuilder builder,
+                       final String name,
+                       final Set<String> sourceNodes,
+                       final boolean repartitionRequired) {
+        super(builder, name, sourceNodes);
         this.repartitionRequired = repartitionRequired;
         this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
             @Override
@@ -120,34 +121,34 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
+    public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = topology.newName(FILTER_NAME);
+        String name = builder.newName(FILTER_NAME);
 
-        topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
     public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = topology.newName(FILTER_NAME);
+        String name = builder.newName(FILTER_NAME);
 
-        topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
+        return new KStreamImpl<>(builder, internalSelectKey(mapper), sourceNodes, true);
     }
 
     private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
-        String name = topology.newName(KEY_SELECT_NAME);
-        topology.addProcessor(
+        String name = builder.newName(KEY_SELECT_NAME);
+        builder.internalTopologyBuilder.addProcessor(
             name,
             new KStreamMap<>(
                 new KeyValueMapper<K, V, KeyValue<K1, V>>() {
@@ -163,24 +164,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+    public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = topology.newName(MAP_NAME);
+        String name = builder.newName(MAP_NAME);
 
-        topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true);
     }
 
 
     @Override
-    public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
+    public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = topology.newName(MAPVALUES_NAME);
+        String name = builder.newName(MAPVALUES_NAME);
 
-        topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -194,12 +195,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public void print(final Serde<K> keySerde, final Serde<V> valSerde) {
+    public void print(final Serde<K> keySerde,
+                      final Serde<V> valSerde) {
         print(defaultKeyValueMapper, keySerde, valSerde, this.name);
     }
 
     @Override
-    public void print(final Serde<K> keySerde, final Serde<V> valSerde, final String label) {
+    public void print(final Serde<K> keySerde,
+                      final Serde<V> valSerde,
+                      final String label) {
         print(defaultKeyValueMapper, keySerde, valSerde, label);
     }
 
@@ -209,21 +213,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String label) {
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
+                      final String label) {
         print(mapper, null, null, label);
     }
 
     @Override
-    public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde) {
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
+                      final Serde<K> keySerde,
+                      final Serde<V> valSerde) {
         print(mapper, keySerde, valSerde, this.name);
     }
 
     @Override
-    public void print(KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, Serde<V> valSerde, final String label) {
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
+                      final Serde<K> keySerde,
+                      final Serde<V> valSerde,
+                      final String label) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(label, "label can't be null");
-        String name = topology.newName(PRINTING_NAME);
-        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null, mapper, label), keySerde, valSerde), this.name);
+        String name = builder.newName(PRINTING_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null, mapper, label), keySerde, valSerde), this.name);
     }
 
     @Override
@@ -232,181 +242,207 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public void writeAsText(final String filePath, final String label) {
+    public void writeAsText(final String filePath,
+                            final String label) {
         writeAsText(filePath, label, null, null, defaultKeyValueMapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde) {
+    public void writeAsText(final String filePath,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde) {
         writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde) {
+    public void writeAsText(final String filePath,
+                            final String label,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde) {
         writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper) {
+    public void writeAsText(final String filePath,
+                            final KeyValueMapper<? super K, ? super V, String> mapper) {
         writeAsText(filePath, this.name, null, null, mapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final String label, final KeyValueMapper<? super K, ? super V, String> mapper) {
+    public void writeAsText(final String filePath,
+                            final String label,
+                            final KeyValueMapper<? super K, ? super V, String> mapper) {
         writeAsText(filePath, label, null, null, mapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper) {
+    public void writeAsText(final String filePath,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde,
+                            final KeyValueMapper<? super K, ? super V, String> mapper) {
         writeAsText(filePath, this.name, keySerde, valSerde, mapper);
     }
 
     @Override
-    public void writeAsText(final String filePath, final String label, final Serde<K> keySerde, final Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String> mapper) {
+    public void writeAsText(final String filePath,
+                            final String label,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String> mapper) {
         Objects.requireNonNull(filePath, "filePath can't be null");
         Objects.requireNonNull(label, "label can't be null");
         Objects.requireNonNull(mapper, "mapper can't be null");
         if (filePath.trim().isEmpty()) {
-            throw new TopologyBuilderException("filePath can't be an empty string");
+            throw new TopologyException("filePath can't be an empty string");
         }
-        final String name = topology.newName(PRINTING_NAME);
+        final String name = builder.newName(PRINTING_NAME);
         try {
             PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), this.name);
+            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
-            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
-            throw new TopologyBuilderException(message);
+            throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
         }
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = topology.newName(FLATMAP_NAME);
+        String name = builder.newName(FLATMAP_NAME);
 
-        topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true);
     }
 
     @Override
-    public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
+    public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
-        String name = topology.newName(FLATMAPVALUES_NAME);
+        String name = builder.newName(FLATMAPVALUES_NAME);
 
-        topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
     @SuppressWarnings("unchecked")
-    public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicates) {
+    public KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates) {
         if (predicates.length == 0) {
             throw new IllegalArgumentException("you must provide at least one predicate");
         }
-        for (Predicate<? super K, ? super V> predicate : predicates) {
+        for (final Predicate<? super K, ? super V> predicate : predicates) {
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
-        String branchName = topology.newName(BRANCH_NAME);
+        String branchName = builder.newName(BRANCH_NAME);
 
-        topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
+        builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
 
         KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
         for (int i = 0; i < predicates.length; i++) {
-            String childName = topology.newName(BRANCHCHILD_NAME);
+            String childName = builder.newName(BRANCHCHILD_NAME);
 
-            topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
+            builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
 
-            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes, this.repartitionRequired);
+            branchChildren[i] = new KStreamImpl<>(builder, childName, sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
     }
 
-    public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K, V>[] streams) {
+    public static <K, V> KStream<K, V> merge(final InternalStreamsBuilder builder,
+                                             final KStream<K, V>[] streams) {
         if (streams == null || streams.length == 0) {
             throw new IllegalArgumentException("Parameter <streams> must not be null or has length zero");
         }
 
-        String name = topology.newName(MERGE_NAME);
+        String name = builder.newName(MERGE_NAME);
         String[] parentNames = new String[streams.length];
         Set<String> allSourceNodes = new HashSet<>();
         boolean requireRepartitioning = false;
 
         for (int i = 0; i < streams.length; i++) {
-            KStreamImpl stream = (KStreamImpl) streams[i];
+            KStreamImpl<K, V> stream = (KStreamImpl<K, V>) streams[i];
 
             parentNames[i] = stream.name;
             requireRepartitioning |= stream.repartitionRequired;
             allSourceNodes.addAll(stream.sourceNodes);
         }
 
-        topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes, requireRepartitioning);
+        return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning);
     }
 
     @Override
-    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public KStream<K, V> through(final Serde<K> keySerde,
+                                 final Serde<V> valSerde,
+                                 final StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         to(keySerde, valSerde, partitioner, topic);
 
-        return topology.stream(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic);
+        return builder.stream(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic);
     }
 
     @Override
-    public void foreach(ForeachAction<? super K, ? super V> action) {
+    public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        String name = topology.newName(FOREACH_NAME);
+        String name = builder.newName(FOREACH_NAME);
 
-        topology.addProcessor(name, new KStreamPeek<>(action, false), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name);
     }
 
     @Override
     public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        final String name = topology.newName(PEEK_NAME);
+        final String name = builder.newName(PEEK_NAME);
 
-        topology.addProcessor(name, new KStreamPeek<>(action, true), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
     }
 
     @Override
-    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+    public KStream<K, V> through(final Serde<K> keySerde,
+                                 final Serde<V> valSerde,
+                                 final String topic) {
         return through(keySerde, valSerde, null, topic);
     }
 
     @Override
-    public KStream<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                                 final String topic) {
         return through(null, null, partitioner, topic);
     }
 
     @Override
-    public KStream<K, V> through(String topic) {
+    public KStream<K, V> through(final String topic) {
         return through(null, null, null, topic);
     }
 
     @Override
-    public void to(String topic) {
+    public void to(final String topic) {
         to(null, null, null, topic);
     }
 
     @Override
-    public void to(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public void to(final StreamPartitioner<? super K, ? super V> partitioner,
+                   final String topic) {
         to(null, null, partitioner, topic);
     }
 
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+    public void to(final Serde<K> keySerde,
+                   final Serde<V> valSerde,
+                   final String topic) {
         to(keySerde, valSerde, null, topic);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
+    public void to(final Serde<K> keySerde,
+                   final Serde<V> valSerde,
+                   StreamPartitioner<? super K, ? super V> partitioner,
+                   final String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
-        final String name = topology.newName(SINK_NAME);
+        final String name = builder.newName(SINK_NAME);
 
         final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
@@ -416,78 +452,80 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
+        builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
+    public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+                                              final String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
-        String name = topology.newName(TRANSFORM_NAME);
+        String name = builder.newName(TRANSFORM_NAME);
 
-        topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
+        if (stateStoreNames != null && stateStoreNames.length > 0) {
+            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
+        }
 
-        return new KStreamImpl<>(topology, name, sourceNodes, true);
+        return new KStreamImpl<>(builder, name, sourceNodes, true);
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, String... stateStoreNames) {
+    public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
+                                               final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
-        String name = topology.newName(TRANSFORMVALUES_NAME);
+        String name = builder.newName(TRANSFORMVALUES_NAME);
 
-        topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
+        if (stateStoreNames != null && stateStoreNames.length > 0) {
+            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
+        }
 
-        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
+        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
-    public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, String... stateStoreNames) {
-        String name = topology.newName(PROCESSOR_NAME);
+    public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
+                        final String... stateStoreNames) {
+        final String name = builder.newName(PROCESSOR_NAME);
 
-        topology.addProcessor(name, processorSupplier, this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+        if (stateStoreNames != null && stateStoreNames.length > 0) {
+            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
+        }
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(
-            final KStream<K, V1> other,
-            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-            final JoinWindows windows,
-            final Serde<K> keySerde,
-            final Serde<V> thisValueSerde,
-            final Serde<V1> otherValueSerde) {
-
-        return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false));
+    public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
+                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                      final JoinWindows windows,
+                                      final Serde<K> keySerde,
+                                      final Serde<V> thisValueSerde,
+                                      final Serde<V1> otherValueSerde) {
+        return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde,
+            new KStreamImplJoin(false, false));
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(
-        final KStream<K, V1> other,
-        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-        final JoinWindows windows) {
-
+    public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
+                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                      final JoinWindows windows) {
         return join(other, joiner, windows, null, null, null);
     }
 
     @Override
-    public <V1, R> KStream<K, R> outerJoin(
-        final KStream<K, V1> other,
-        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-        final JoinWindows windows,
-        final Serde<K> keySerde,
-        final Serde<V> thisValueSerde,
-        final Serde<V1> otherValueSerde) {
-
+    public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
+                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                           final JoinWindows windows,
+                                           final Serde<K> keySerde,
+                                           final Serde<V> thisValueSerde,
+                                           final Serde<V1> otherValueSerde) {
         return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true));
     }
 
     @Override
-    public <V1, R> KStream<K, R> outerJoin(
-        final KStream<K, V1> other,
-        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-        final JoinWindows windows) {
-
+    public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
+                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                           final JoinWindows windows) {
         return outerJoin(other, joiner, windows, null, null, null);
     }
 
@@ -506,11 +544,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
+            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde, null);
+            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde);
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -524,28 +562,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             otherValueSerde);
     }
 
-
     /**
      * Repartition a stream. This is required on join operations occurring after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
      * @param keySerde      Serdes for serializing the keys
      * @param valSerde      Serdes for serilaizing the values
-     * @param topicNamePrefix  prefix of topic name created for repartitioning, can be null,
-     *                         in which case the prefix will be auto-generated internally.
      * @return a new {@link KStreamImpl}
      */
-    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde,
-                                                 Serde<V> valSerde,
-                                                 final String topicNamePrefix) {
-
-        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, topicNamePrefix);
-        return new KStreamImpl<>(topology, repartitionedSourceName, Collections
+    private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde) {
+        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null);
+        return new KStreamImpl<>(builder, repartitionedSourceName, Collections
             .singleton(repartitionedSourceName), false);
     }
 
-    static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
-                                                    Serde<K1> keySerde,
-                                                    Serde<V1> valSerde,
+    static <K1, V1> String createReparitionedSource(final AbstractStream<K1> stream,
+                                                    final Serde<K1> keySerde,
+                                                    final Serde<V1> valSerde,
                                                     final String topicNamePrefix) {
         Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
         Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
@@ -554,35 +587,33 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
 
         String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
-        String sinkName = stream.topology.newName(SINK_NAME);
-        String filterName = stream.topology.newName(FILTER_NAME);
-        String sourceName = stream.topology.newName(SOURCE_NAME);
+        String sinkName = stream.builder.newName(SINK_NAME);
+        String filterName = stream.builder.newName(FILTER_NAME);
+        String sourceName = stream.builder.newName(SOURCE_NAME);
 
-        stream.topology.addInternalTopic(repartitionTopic);
-        stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
+        stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
+        stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
             @Override
             public boolean test(final K1 key, final V1 value) {
                 return key != null;
             }
         }, false), stream.name);
 
-        stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
-                         valSerializer, filterName);
-        stream.topology.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, valDeserializer,
-                           repartitionTopic);
+        stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
+            null, filterName);
+        stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
+            keyDeserializer, valDeserializer, repartitionTopic);
 
         return sourceName;
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(
-        final KStream<K, V1> other,
-        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-        final JoinWindows windows,
-        final Serde<K> keySerde,
-        final Serde<V> thisValSerde,
-        final Serde<V1> otherValueSerde) {
-
+    public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                          final JoinWindows windows,
+                                          final Serde<K> keySerde,
+                                          final Serde<V> thisValSerde,
+                                          final Serde<V1> otherValueSerde) {
         return doJoin(other,
             joiner,
             windows,
@@ -593,18 +624,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(
-        KStream<K, V1> other,
-        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-        JoinWindows windows) {
-
+    public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                          final JoinWindows windows) {
         return leftJoin(other, joiner, windows, null, null, null);
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
+                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return join(other, joiner, null, null);
-
     }
 
     @Override
@@ -613,16 +642,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                       final Serde<K> keySerde,
                                       final Serde<V> valueSerde) {
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
-                valueSerde, null);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde, valueSerde);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
         } else {
             return doStreamTableJoin(other, joiner, false);
         }
     }
 
-
-
     @Override
     public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
                                               final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
@@ -646,9 +672,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(joiner, "joiner can't be null");
 
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
-        final String name = topology.newName(LEFTJOIN_NAME);
-        topology.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
-        return new KStreamImpl<>(topology, name, sourceNodes, false);
+        final String name = builder.newName(LEFTJOIN_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
+        return new KStreamImpl<>(builder, name, sourceNodes, false);
     }
 
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
@@ -659,12 +685,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
-        topology.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
-        topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
-        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
+        final String name = builder.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
+        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
+        builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes, false);
+        return new KStreamImpl<>(builder, name, allSourceNodes, false);
     }
 
     @Override
@@ -677,8 +703,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                           final Serde<K> keySerde,
                                           final Serde<V> valueSerde) {
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
-                                                                                valueSerde, null);
+            final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde, valueSerde);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
         } else {
             return doStreamTableJoin(other, joiner, true);
@@ -686,18 +711,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector) {
+    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
         return groupBy(selector, null, null);
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector,
-                                              Serde<K1> keySerde,
-                                              Serde<V> valSerde) {
-
+    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector,
+                                              final Serde<K1> keySerde,
+                                              final Serde<V> valSerde) {
         Objects.requireNonNull(selector, "selector can't be null");
         String selectName = internalSelectKey(selector);
-        return new KGroupedStreamImpl<>(topology,
+        return new KGroupedStreamImpl<>(builder,
                                         selectName,
                                         sourceNodes,
                                         keySerde,
@@ -710,9 +734,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
-                                           Serde<V> valSerde) {
-        return new KGroupedStreamImpl<>(topology,
+    public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
+                                           final Serde<V> valSerde) {
+        return new KGroupedStreamImpl<>(builder,
                                         this.name,
                                         sourceNodes,
                                         keySerde,
@@ -721,9 +745,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
-                                                                     final Serde<K> keySerde,
-                                                                     final Serde<V> valueSerde,
-                                                                     final String storeName) {
+                                                                      final Serde<K> keySerde,
+                                                                      final Serde<V> valueSerde,
+                                                                      final String storeName) {
         return Stores.create(storeName)
             .withKeys(keySerde)
             .withValues(valueSerde)
@@ -737,23 +761,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         private final boolean leftOuter;
         private final boolean rightOuter;
 
-        KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) {
+        KStreamImplJoin(final boolean leftOuter,
+                        final boolean rightOuter) {
             this.leftOuter = leftOuter;
             this.rightOuter = rightOuter;
         }
 
-        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
-                                                   KStream<K1, V2> other,
-                                                   ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
-                                                   JoinWindows windows,
-                                                   Serde<K1> keySerde,
-                                                   Serde<V1> lhsValueSerde,
-                                                   Serde<V2> otherValueSerde) {
-            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
-            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-            String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
-            String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
-            String joinMergeName = topology.newName(MERGE_NAME);
+        public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
+                                                   final KStream<K1, V2> other,
+                                                   final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
+                                                   final JoinWindows windows,
+                                                   final Serde<K1> keySerde,
+                                                   final Serde<V1> lhsValueSerde,
+                                                   final Serde<V2> otherValueSerde) {
+            String thisWindowStreamName = builder.newName(WINDOWED_NAME);
+            String otherWindowStreamName = builder.newName(WINDOWED_NAME);
+            String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME);
+            String joinOtherName = leftOuter ? builder.newName(OUTEROTHER_NAME) : builder.newName(JOINOTHER_NAME);
+            String joinMergeName = builder.newName(MERGE_NAME);
 
             StateStoreSupplier thisWindow =
                 createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
@@ -783,17 +808,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
 
 
-            topology.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
-            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
-            topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
-            topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
-            topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-            topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
-            topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
+            builder.internalTopologyBuilder.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
+            builder.internalTopologyBuilder.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
+            builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, thisWindowStreamName);
+            builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
+            builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+            builder.internalTopologyBuilder.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
+            builder.internalTopologyBuilder.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
 
             Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(topology, joinMergeName, allSourceNodes, false);
+            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 1d7078a..e21f702 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -28,13 +28,13 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
     private final String windowName;
 
     /**
-     * @throws TopologyBuilderException if retention period of the join window is less than expected
+     * @throws TopologyException if retention period of the join window is less than expected
      */
     KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
         this.windowName = windowName;
 
         if (windowSizeMs > retentionPeriodMs)
-            throw new TopologyBuilderException("The retention period of the join window "
+            throw new TopologyException("The retention period of the join window "
                     + windowName + " must be no smaller than its window size.");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 048670e..4aca713 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,15 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -55,13 +54,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String FOREACH_NAME = "KTABLE-FOREACH-";
 
-    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+    private static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
 
-    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+    private static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
 
     private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
 
-    public static final String MERGE_NAME = "KTABLE-MERGE-";
+    private static final String MERGE_NAME = "KTABLE-MERGE-";
 
     private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
@@ -84,13 +83,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
 
-    public KTableImpl(KStreamBuilder topology,
-                      String name,
-                      ProcessorSupplier<?, ?> processorSupplier,
-                      Set<String> sourceNodes,
+    public KTableImpl(final InternalStreamsBuilder builder,
+                      final String name,
+                      final ProcessorSupplier<?, ?> processorSupplier,
+                      final Set<String> sourceNodes,
                       final String queryableStoreName,
-                      boolean isQueryable) {
-        super(topology, name, sourceNodes);
+                      final boolean isQueryable) {
+        super(builder, name, sourceNodes);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
         this.keySerde = null;
@@ -104,15 +103,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         };
     }
 
-    public KTableImpl(KStreamBuilder topology,
-                      String name,
-                      ProcessorSupplier<?, ?> processorSupplier,
+    public KTableImpl(final InternalStreamsBuilder builder,
+                      final String name,
+                      final ProcessorSupplier<?, ?> processorSupplier,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde,
-                      Set<String> sourceNodes,
+                      final Set<String> sourceNodes,
                       final String queryableStoreName,
-                      boolean isQueryable) {
-        super(topology, name, sourceNodes);
+                      final boolean isQueryable) {
+        super(builder, name, sourceNodes);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
         this.keySerde = keySerde;
@@ -140,19 +139,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                                   final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                  boolean isFilterNot) {
+                                  final boolean isFilterNot) {
         Objects.requireNonNull(predicate, "predicate can't be null");
-        String name = topology.newName(FILTER_NAME);
+        String name = builder.newName(FILTER_NAME);
         String internalStoreName = null;
         if (storeSupplier != null) {
             internalStoreName = storeSupplier.name();
         }
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
-        topology.addProcessor(name, processorSupplier, this.name);
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (storeSupplier != null) {
-            topology.addStateStore(storeSupplier, name);
+            builder.internalTopologyBuilder.addStateStore(storeSupplier, name);
         }
-        return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
+        return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
     }
 
     @Override
@@ -161,7 +160,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                               final String queryableStoreName) {
         StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
@@ -170,7 +170,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
+                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, false);
     }
@@ -181,7 +182,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName) {
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                                  final String queryableStoreName) {
         StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
@@ -190,7 +192,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier) {
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
+                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, true);
     }
@@ -199,18 +202,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                            final Serde<V1> valueSerde,
                                            final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
-        String name = topology.newName(MAPVALUES_NAME);
+        String name = builder.newName(MAPVALUES_NAME);
         String internalStoreName = null;
         if (storeSupplier != null) {
             internalStoreName = storeSupplier.name();
         }
         KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
-        topology.addProcessor(name, processorSupplier, this.name);
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
         if (storeSupplier != null) {
-            topology.addStateStore(storeSupplier, name);
-            return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
+            builder.internalTopologyBuilder.addStateStore(storeSupplier, name);
+            return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
         } else {
-            return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
+            return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
         }
     }
 
@@ -246,60 +249,68 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @SuppressWarnings("deprecation")
     @Override
-    public void print(String label) {
+    public void print(final String label) {
         print(null, null, label);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde) {
+    public void print(final Serde<K> keySerde,
+                      final Serde<V> valSerde) {
         print(keySerde, valSerde, this.name);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public void print(Serde<K> keySerde, final Serde<V> valSerde, String label) {
+    public void print(final Serde<K> keySerde,
+                      final Serde<V> valSerde,
+                      final String label) {
         Objects.requireNonNull(label, "label can't be null");
-        String name = topology.newName(PRINTING_NAME);
-        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
+        final String name = builder.newName(PRINTING_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public void writeAsText(String filePath) {
+    public void writeAsText(final String filePath) {
         writeAsText(filePath, this.name, null, null);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public void writeAsText(String filePath, String label) {
+    public void writeAsText(final String filePath,
+                            final String label) {
         writeAsText(filePath, label, null, null);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
+    public void writeAsText(final String filePath,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde) {
         writeAsText(filePath, this.name, keySerde, valSerde);
     }
 
     /**
-     * @throws TopologyBuilderException if file is not found
+     * @throws TopologyException if file is not found
      */
     @SuppressWarnings("deprecation")
     @Override
-    public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) {
+    public void writeAsText(final String filePath,
+                            final String label,
+                            final Serde<K> keySerde,
+                            final Serde<V> valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
         Objects.requireNonNull(label, "label can't be null");
         if (filePath.trim().isEmpty()) {
-            throw new TopologyBuilderException("filePath can't be an empty string");
+            throw new TopologyException("filePath can't be an empty string");
         }
-        String name = topology.newName(PRINTING_NAME);
+        String name = builder.newName(PRINTING_NAME);
         try {
             PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
-        } catch (FileNotFoundException | UnsupportedEncodingException e) {
-            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
-            throw new TopologyBuilderException(message);
+            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
+        } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+            throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
         }
     }
 
@@ -307,14 +318,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
-        String name = topology.newName(FOREACH_NAME);
+        String name = builder.newName(FOREACH_NAME);
         KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
             @Override
             public void apply(K key, Change<V> value) {
                 action.apply(key, value.newValue);
             }
         }, false);
-        topology.addProcessor(name, processorSupplier, this.name);
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
     }
 
     @Override
@@ -323,11 +334,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                 final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
                                 final String queryableStoreName) {
-        final String internalStoreName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(KTableImpl.TOSTREAM_NAME);
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(KTableImpl.TOSTREAM_NAME);
 
         to(keySerde, valSerde, partitioner, topic);
 
-        return topology.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, internalStoreName);
+        return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, internalStoreName);
     }
 
     @Override
@@ -339,7 +350,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
-        return topology.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier);
+        return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier);
     }
 
     @Override
@@ -413,48 +424,54 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public void to(String topic) {
+    public void to(final String topic) {
         to(null, null, null, topic);
     }
 
     @Override
-    public void to(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public void to(final StreamPartitioner<? super K, ? super V> partitioner,
+                   final String topic) {
         to(null, null, partitioner, topic);
     }
 
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+    public void to(final Serde<K> keySerde,
+                   final Serde<V> valSerde,
+                   final String topic) {
         this.toStream().to(keySerde, valSerde, null, topic);
     }
 
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
+    public void to(final Serde<K> keySerde,
+                   final Serde<V> valSerde,
+                   final StreamPartitioner<? super K, ? super V> partitioner,
+                   final String topic) {
         this.toStream().to(keySerde, valSerde, partitioner, topic);
     }
 
     @Override
     public KStream<K, V> toStream() {
-        String name = topology.newName(TOSTREAM_NAME);
+        String name = builder.newName(TOSTREAM_NAME);
 
-        topology.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
             @Override
             public V apply(Change<V> change) {
                 return change.newValue;
             }
         }), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes, false);
+        return new KStreamImpl<>(builder, name, sourceNodes, false);
     }
 
     @Override
-    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+    public <K1> KStream<K1, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         return toStream().selectKey(mapper);
     }
 
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, false, false, null, (String) null);
+        return doJoin(other, joiner, false, false, null, null);
     }
 
     @Override
@@ -476,7 +493,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, true, true, null, (String) null);
+        return doJoin(other, joiner, true, true, null, null);
     }
 
     @Override
@@ -498,7 +515,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return doJoin(other, joiner, true, false, null, (String) null);
+        return doJoin(other, joiner, true, false, null, null);
     }
 
     @Override
@@ -519,7 +536,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @SuppressWarnings("unchecked")
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
-                                        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
                                         final boolean rightOuter,
                                         final Serde<R> joinSerde,
@@ -533,7 +550,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
-                                        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
                                         final boolean rightOuter,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
@@ -549,9 +566,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             ((KTableImpl) other).enableSendingOldValues();
         }
 
-        final String joinThisName = topology.newName(JOINTHIS_NAME);
-        final String joinOtherName = topology.newName(JOINOTHER_NAME);
-        final String joinMergeName = topology.newName(MERGE_NAME);
+        final String joinThisName = builder.newName(JOINTHIS_NAME);
+        final String joinOtherName = builder.newName(JOINOTHER_NAME);
+        final String joinMergeName = builder.newName(MERGE_NAME);
 
         final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
         final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
@@ -568,44 +585,43 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, this.internalStoreName(), false),
-                new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
+                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.internalStoreName(), false),
+                new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes,
                         ((KTableImpl<K, ?, ?>) other).internalStoreName(), false),
                 internalQueryableName
         );
 
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
-        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
+        builder.internalTopologyBuilder.addProcessor(joinThisName, joinThis, this.name);
+        builder.internalTopologyBuilder.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+        builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+        builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         if (internalQueryableName != null) {
-            topology.addStateStore(storeSupplier, joinMergeName);
+            builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName);
         }
 
-        return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
+        return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
     }
 
     @Override
-    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
-                                                  Serde<K1> keySerde,
-                                                  Serde<V1> valueSerde) {
-
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
+                                                  final Serde<K1> keySerde,
+                                                  final Serde<V1> valueSerde) {
         Objects.requireNonNull(selector, "selector can't be null");
-        String selectName = topology.newName(SELECT_NAME);
+        String selectName = builder.newName(SELECT_NAME);
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
-        topology.addProcessor(selectName, selectSupplier, this.name);
+        builder.internalTopologyBuilder.addProcessor(selectName, selectSupplier, this.name);
         this.enableSendingOldValues();
 
-        return new KGroupedTableImpl<>(topology, selectName, this.name, keySerde, valueSerde);
+        return new KGroupedTableImpl<>(builder, selectName, this.name, keySerde, valueSerde);
     }
 
     @Override
-    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
         return this.groupBy(selector, null, null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 181e97c..27db594 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -43,7 +43,7 @@ public interface PartitionGrouper {
      * The default partition grouper implements this interface by assigning all partitions across different topics with the same
      * partition id into the same task. See {@link DefaultPartitionGrouper} for more information.
      *
-     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups()} topic group} id to topics
+     * @param topicGroups The map from the topic group id to topics
      * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */


Mime
View raw message