kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [6/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Date Mon, 31 Jul 2017 22:29:09 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 46769eb..af05170 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
@@ -43,16 +44,26 @@ import java.util.regex.Pattern;
 /**
  * {@code KStreamBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
  *
- * @see TopologyBuilder
+ * @see org.apache.kafka.streams.processor.TopologyBuilder
  * @see KStream
  * @see KTable
  * @see GlobalKTable
+ * @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
  */
-@InterfaceStability.Evolving
+@Deprecated
 public class KStreamBuilder extends TopologyBuilder {
 
     private final AtomicInteger index = new AtomicInteger(0);
 
+    private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
+
+    private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+        if (resetPolicy == null) {
+            return null;
+        }
+        return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+    }
+
     /**
      * Create a {@link KStream} from the specified topics.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -92,7 +103,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(offsetReset, null, null, null, topics);
     }
 
-
     /**
      * Create a {@link KStream} from the specified topic pattern.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -155,7 +165,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(null, null, keySerde, valSerde, topics);
     }
 
-
     /**
      * Create a {@link KStream} from the specified topics.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
@@ -182,7 +191,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(offsetReset, null, keySerde, valSerde, topics);
     }
 
-
     /**
      * Create a {@link KStream} from the specified topics.
      * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
@@ -209,7 +217,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(null, timestampExtractor, keySerde, valSerde, topics);
     }
 
-
     /**
      * Create a {@link KStream} from the specified topics.
      * <p>
@@ -235,14 +242,18 @@ public class KStreamBuilder extends TopologyBuilder {
                                        final Serde<K> keySerde,
                                        final Serde<V> valSerde,
                                        final String... topics) {
-        final String name = newName(KStreamImpl.SOURCE_NAME);
+        try {
+            final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
+                keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+            return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
+        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+        }
     }
 
-
     /**
      * Create a {@link KStream} from the specified topic pattern.
      * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
@@ -322,7 +333,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(null, timestampExtractor, keySerde, valSerde, topicPattern);
     }
 
-
     /**
      * Create a {@link KStream} from the specified topic pattern.
      * <p>
@@ -349,14 +359,18 @@ public class KStreamBuilder extends TopologyBuilder {
                                        final Serde<K> keySerde,
                                        final Serde<V> valSerde,
                                        final Pattern topicPattern) {
-        final String name = newName(KStreamImpl.SOURCE_NAME);
+        try {
+            final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
+                keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+            return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
+        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+        }
     }
 
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
@@ -375,14 +389,14 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param topic     the topic name; cannot be {@code null}
+     * @param topic              the topic name; cannot be {@code null}
      * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
      * @return a {@link KTable} for the specified topic
      */
@@ -409,14 +423,14 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param topic     the topic name; cannot be {@code null}
+     * @param topic         the topic name; cannot be {@code null}
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} for the specified topic
      */
@@ -425,7 +439,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(null, null, null, null, topic, storeSupplier);
     }
 
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
@@ -463,17 +476,17 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param topic       the topic name; cannot be {@code null}
-     * @param queryableStoreName   the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+     * @param offsetReset       the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                          offsets are available
+     * @param topic             the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -500,7 +513,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -534,7 +547,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * <p>
      * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
      *                    offsets are available
-     * @param topic       the topic name; if {@code null} an internal store name will be automatically given.
+     * @param topic       the topic name; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -542,7 +555,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(offsetReset, null, null, null, topic, (String) null);
     }
 
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
@@ -561,7 +573,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -580,7 +592,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(null, timestampExtractor, null, null, topic, storeName);
     }
 
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
@@ -598,7 +609,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -618,7 +629,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(offsetReset, timestampExtractor, null, null, topic, storeName);
     }
 
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
@@ -637,18 +647,18 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
      * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
      * @return a {@link KTable} for the specified topic
      */
@@ -677,18 +687,18 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
+     * @param keySerde      key serde used to send key-value pairs,
+     *                      if not specified the default key serde defined in the configuration will be used
+     * @param valSerde      value serde used to send key-value pairs,
+     *                      if not specified the default value serde defined in the configuration will be used
+     * @param topic         the topic name; cannot be {@code null}
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} for the specified topic
      */
@@ -732,23 +742,29 @@ public class KStreamBuilder extends TopologyBuilder {
                                         final String topic,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier,
                                         final boolean isQueryable) {
-        final String source = newName(KStreamImpl.SOURCE_NAME);
-        final String name = newName(KTableImpl.SOURCE_NAME);
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+        try {
+            final String source = newName(KStreamImpl.SOURCE_NAME);
+            final String name = newName(KTableImpl.SOURCE_NAME);
+            final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
 
-        addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), source, timestampExtractor,
+                keySerde == null ? null : keySerde.deserializer(),
                 valSerde == null ? null : valSerde.deserializer(),
                 topic);
-        addProcessor(name, processorSupplier, source);
+            internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
-        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
-                keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+            final KTableImpl<K, ?, V> kTable = new KTableImpl<>(internalStreamsBuilder, name, processorSupplier,
+                    keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
 
-        addStateStore(storeSupplier, name);
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+            addStateStore(storeSupplier, name);
+            connectSourceStoreAndTopic(storeSupplier.name(), topic);
 
-        return kTable;
+            return kTable;
+        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+        }
     }
+
     /**
      * Create a {@link KTable} for the specified topic.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
@@ -765,21 +781,21 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valSerde    value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param topic       the topic name; cannot be {@code null}
-     * @param queryableStoreName   the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                           offsets are available
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -790,8 +806,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName);
     }
 
-
-
     /**
      * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
@@ -809,7 +823,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -834,8 +848,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return table(null, timestampExtractor, keySerde, valSerde, topic, storeName);
     }
 
-
-
     /**
      * Create a {@link KTable} for the specified topic.
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
@@ -852,7 +864,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -902,11 +914,11 @@ public class KStreamBuilder extends TopologyBuilder {
      * <p>
      * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
      *                    offsets are available
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name; cannot be {@code null}
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param topic       the topic name; cannot be {@code null}
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -915,6 +927,7 @@ public class KStreamBuilder extends TopologyBuilder {
                                      final String topic) {
         return table(offsetReset, null, keySerde, valSerde, topic, (String) null);
     }
+
     /**
      * Create a {@link KTable} for the specified topic.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
@@ -931,7 +944,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -972,7 +985,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -1005,10 +1018,9 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return globalTable(null, null, null, topic, (String) null);
+        return globalTable(null, null, null, topic, null);
     }
 
-
     /**
      * Create a {@link GlobalKTable} for the specified topic.
      * The default {@link TimestampExtractor} and default key and value deserializers as specified in
@@ -1024,7 +1036,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -1067,7 +1079,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -1103,7 +1115,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -1139,7 +1151,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -1162,17 +1174,20 @@ public class KStreamBuilder extends TopologyBuilder {
                                                     final TimestampExtractor timestampExtractor,
                                                     final String topic,
                                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        final String sourceName = newName(KStreamImpl.SOURCE_NAME);
-        final String processorName = newName(KTableImpl.SOURCE_NAME);
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
-
-
-        final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
-        addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
-        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+        try {
+            Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+            final String sourceName = newName(KStreamImpl.SOURCE_NAME);
+            final String processorName = newName(KTableImpl.SOURCE_NAME);
+            final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+
+            final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+            final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+            return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+        }
     }
 
     /**
@@ -1211,7 +1226,11 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} containing all records of the given streams
      */
     public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
-        return KStreamImpl.merge(this, streams);
+        try {
+            return KStreamImpl.merge(internalStreamsBuilder, streams);
+        } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+            throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b7dd43e..06a0eee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
@@ -34,7 +35,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
  * Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
  * <p>
- * A {@code KTable} is either {@link KStreamBuilder#table(String, String) defined from a single Kafka topic} that is
+ * A {@code KTable} is either {@link StreamsBuilder#table(String, String) defined from a single Kafka topic} that is
  * consumed message by message or the result of a {@code KTable} transformation.
  * An aggregation of a {@link KStream} also yields a {@code KTable}.
  * <p>
@@ -62,7 +63,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  * @see KStream
  * @see KGroupedTable
  * @see GlobalKTable
- * @see KStreamBuilder#table(String, String)
+ * @see StreamsBuilder#table(String, String)
  */
 @InterfaceStability.Evolving
 public interface KTable<K, V> {
@@ -630,10 +631,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
@@ -651,10 +652,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
@@ -671,10 +672,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
+     * {@link StreamsBuilder#table(String)})
      *
      * @param topic     the topic name
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
@@ -689,10 +690,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
+     * {@link StreamsBuilder#table(String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
@@ -710,10 +711,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
@@ -734,10 +735,10 @@ public interface KTable<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used
@@ -758,10 +759,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
@@ -785,10 +786,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
@@ -811,10 +812,10 @@ public interface KTable<K, V> {
      * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
-     * {@link KStreamBuilder#table(String)})
+     * {@link StreamsBuilder#table(String)})
      *
      * @param keySerde  key serde used to send key-value pairs,
      *                  if not specified the default key serde defined in the configuration will be used
@@ -834,10 +835,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -866,10 +867,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
-     * {@link KStreamBuilder#table(String, String)})
+     * {@link StreamsBuilder#table(String, String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -897,10 +898,10 @@ public interface KTable<K, V> {
      * <p>
      * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
      * #to(keySerde, valueSerde, partitioner, someTopicName)} and
-     * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+     * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
-     * {@link KStreamBuilder#table(String)})
+     * {@link StreamsBuilder#table(String)})
      *
      * @param keySerde    key serde used to send key-value pairs,
      *                    if not specified the default key serde defined in the configuration will be used
@@ -918,7 +919,6 @@ public interface KTable<K, V> {
                          final StreamPartitioner<? super K, ? super V> partitioner,
                          final String topic);
 
-
     /**
      * Materialize this changelog stream to a topic using default serializers and deserializers and producer's
      * {@link DefaultPartitioner}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 81d10ef..b5de562 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windows;
@@ -33,24 +32,24 @@ import java.util.Set;
 
 public abstract class AbstractStream<K> {
 
-    protected final KStreamBuilder topology;
+    protected final InternalStreamsBuilder builder;
     protected final String name;
-    protected final Set<String> sourceNodes;
+    final Set<String> sourceNodes;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
     public AbstractStream(AbstractStream<K> stream) {
-        this.topology = stream.topology;
+        this.builder = stream.builder;
         this.name = stream.name;
         this.sourceNodes = stream.sourceNodes;
     }
 
-    AbstractStream(final KStreamBuilder topology, String name, final Set<String> sourceNodes) {
+    AbstractStream(final InternalStreamsBuilder builder, String name, final Set<String> sourceNodes) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
         }
 
-        this.topology = topology;
+        this.builder = builder;
         this.name = name;
         this.sourceNodes = sourceNodes;
     }
@@ -61,13 +60,13 @@ public abstract class AbstractStream<K> {
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(other.sourceNodes);
 
-        topology.copartitionSources(allSourceNodes);
+        builder.internalTopologyBuilder.copartitionSources(allSourceNodes);
 
         return allSourceNodes;
     }
 
     String getOrCreateName(final String queryableStoreName, final String prefix) {
-        final String returnName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(prefix);
+        final String returnName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(prefix);
         Topic.validate(returnName);
         return returnName;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
new file mode 100644
index 0000000..bcb68f0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+public class InternalStreamsBuilder {
+
+    final InternalTopologyBuilder internalTopologyBuilder;
+
+    private final AtomicInteger index = new AtomicInteger(0);
+
+    public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
+        this.internalTopologyBuilder = internalTopologyBuilder;
+    }
+
+    public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Serde<K> keySerde,
+                                                                  final Serde<V> valSerde,
+                                                                  final String... topics) {
+        final String name = newName(KStreamImpl.SOURCE_NAME);
+
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+    }
+
+    public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+                                       final TimestampExtractor timestampExtractor,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final Pattern topicPattern) {
+        final String name = newName(KStreamImpl.SOURCE_NAME);
+
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final String queryableStoreName) {
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+            keySerde,
+            valSerde,
+            false,
+            Collections.<String, String>emptyMap(),
+            true);
+        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
+    }
+
+    public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
+    }
+
+    private <K, V> KTable<K, V> doTable(final Topology.AutoOffsetReset offsetReset,
+                                        final Serde<K> keySerde,
+                                        final Serde<V> valSerde,
+                                        final TimestampExtractor timestampExtractor,
+                                        final String topic,
+                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                        final boolean isQueryable) {
+        final String source = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newName(KTableImpl.SOURCE_NAME);
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+
+        internalTopologyBuilder.addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
+            valSerde == null ? null : valSerde.deserializer(),
+            topic);
+        internalTopologyBuilder.addProcessor(name, processorSupplier, source);
+
+        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
+            keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+
+        internalTopologyBuilder.addStateStore(storeSupplier, name);
+        internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+        return kTable;
+    }
+
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final TimestampExtractor timestampExtractor,
+                                                 final String topic,
+                                                 final String queryableStoreName) {
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+        return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+            keySerde,
+            valSerde,
+            false,
+            Collections.<String, String>emptyMap(),
+            true));
+    }
+
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic,
+                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
+                                                    final Serde<V> valSerde,
+                                                    final TimestampExtractor timestampExtractor,
+                                                    final String topic,
+                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        final String sourceName = newName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newName(KTableImpl.SOURCE_NAME);
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+
+
+        final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+    }
+
+    public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
+        return KStreamImpl.merge(this, streams);
+    }
+
+    String newName(final String prefix) {
+        return prefix + String.format("%010d", index.getAndIncrement());
+    }
+
+    String newStoreName(final String prefix) {
+        return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
+    }
+
+    public synchronized void addStateStore(final StateStoreSupplier supplier,
+                                           final String... processorNames) {
+        internalTopologyBuilder.addStateStore(supplier, processorNames);
+    }
+
+    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                            final String sourceName,
+                                            final Deserializer keyDeserializer,
+                                            final Deserializer valueDeserializer,
+                                            final String topic,
+                                            final String processorName,
+                                            final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    }
+
+    public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                            final String sourceName,
+                                            final TimestampExtractor timestampExtractor,
+                                            final Deserializer keyDeserializer,
+                                            final Deserializer valueDeserializer,
+                                            final String topic,
+                                            final String processorName,
+                                            final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index a1b40a3..5fd5f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -31,8 +30,8 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Collections;
 import java.util.Objects;
@@ -48,13 +47,13 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     private final boolean repartitionRequired;
     private boolean isQueryable = true;
 
-    KGroupedStreamImpl(final KStreamBuilder topology,
+    KGroupedStreamImpl(final InternalStreamsBuilder builder,
                        final String name,
                        final Set<String> sourceNodes,
                        final Serde<K> keySerde,
                        final Serde<V> valSerde,
                        final boolean repartitionRequired) {
-        super(topology, name, sourceNodes);
+        super(builder, name, sourceNodes);
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
@@ -391,20 +390,21 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
             final String functionName,
             final StateStoreSupplier storeSupplier) {
 
-        final String aggFunctionName = topology.newName(functionName);
+        final String aggFunctionName = builder.newName(functionName);
 
         final String sourceName = repartitionIfRequired(storeSupplier.name());
 
-        topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
-        topology.addStateStore(storeSupplier, aggFunctionName);
-
-        return new KTableImpl<>(topology,
-                aggFunctionName,
-                aggregateSupplier,
-                sourceName.equals(this.name) ? sourceNodes
-                        : Collections.singleton(sourceName),
-                storeSupplier.name(),
-                isQueryable);
+        builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+        builder.internalTopologyBuilder.addStateStore(storeSupplier, aggFunctionName);
+
+        return new KTableImpl<>(
+            builder,
+            aggFunctionName,
+            aggregateSupplier,
+            sourceName.equals(this.name) ? sourceNodes
+                    : Collections.singleton(sourceName),
+            storeSupplier.name(),
+            isQueryable);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 4455848..aefaad8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -50,12 +49,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     protected final Serde<? extends V> valSerde;
     private boolean isQueryable = true;
 
-    public KGroupedTableImpl(final KStreamBuilder topology,
+    public KGroupedTableImpl(final InternalStreamsBuilder builder,
                              final String name,
                              final String sourceName,
                              final Serde<? extends K> keySerde,
                              final Serde<? extends V> valSerde) {
-        super(topology, name, Collections.singleton(sourceName));
+        super(builder, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.isQueryable = true;
@@ -82,7 +81,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                                       final Aggregator<? super K, ? super V, T> adder,
                                       final Aggregator<? super K, ? super V, T> subtractor,
                                       final Serde<T> aggValueSerde) {
-        return aggregate(initializer, adder, subtractor, aggValueSerde, (String) null);
+        return aggregate(initializer, adder, subtractor, aggValueSerde, null);
     }
 
     @Override
@@ -117,9 +116,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
                                          final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        String sinkName = topology.newName(KStreamImpl.SINK_NAME);
-        String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
-        String funcName = topology.newName(functionName);
+        String sinkName = builder.newName(KStreamImpl.SINK_NAME);
+        String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
+        String funcName = builder.newName(functionName);
 
         String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
@@ -132,18 +131,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
-        topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
+        builder.internalTopologyBuilder.addInternalTopic(topic);
+        builder.internalTopologyBuilder.addSink(sinkName, topic, keySerializer, changedValueSerializer, null, this.name);
 
         // read the intermediate topic with RecordMetadataTimestampExtractor
-        topology.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
+        builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
-        topology.addProcessor(funcName, aggregateSupplier, sourceName);
-        topology.addStateStore(storeSupplier, funcName);
+        builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
+        builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+        return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
     }
 
     @Override


Mime
View raw message