kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder
Date Fri, 28 Jul 2017 23:46:40 GMT
KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3590 from mjsax/kafka-3856-replace-topology-builder-by-topology


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1844bf2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1844bf2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1844bf2b

Branch: refs/heads/trunk
Commit: 1844bf2b2f4cdf5a8209d7ceccb6701fc7dcf768
Parents: c50c941
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri Jul 28 16:46:34 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jul 28 16:46:34 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  63 +-
 .../wordcount/WordCountProcessorDemo.java       |   6 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  71 +-
 .../java/org/apache/kafka/streams/Topology.java | 640 ++++++++++++++++++
 .../kafka/streams/TopologyDescription.java      |   7 +-
 .../errors/TopologyBuilderException.java        |   5 +-
 .../kafka/streams/errors/TopologyException.java |  40 ++
 .../apache/kafka/streams/kstream/KStream.java   |   4 +-
 .../kstream/internals/KStreamTransform.java     |   1 +
 .../internals/KStreamTransformValues.java       |   2 +
 .../streams/kstream/internals/KTableImpl.java   |  10 +-
 .../streams/processor/AbstractProcessor.java    |   1 +
 .../streams/processor/TopologyBuilder.java      | 221 +++---
 .../internals/GlobalProcessorContextImpl.java   |   5 +-
 .../internals/InternalTopologyBuilder.java      | 242 ++++---
 .../internals/ProcessorContextImpl.java         |   1 +
 .../internals/SourceNodeRecordDeserializer.java |   1 +
 .../internals/StreamPartitionAssignor.java      |  15 +-
 .../org/apache/kafka/streams/TopologyTest.java  | 671 +++++++++++++++++++
 ...eamsFineGrainedAutoResetIntegrationTest.java |  19 +-
 .../streams/processor/TopologyBuilderTest.java  |   2 +-
 .../kafka/streams/processor/TopologyTest.java   | 408 -----------
 .../internals/InternalTopologyBuilderTest.java  | 140 ++--
 .../internals/StreamPartitionAssignorTest.java  |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 25 files changed, 1865 insertions(+), 719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 542003d..76c2dc7 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -123,14 +123,14 @@
     <h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
 
     <p>
-        With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+        With the customized processors defined in the Processor API, developers can use <code>Topology</code> to build a processor topology
         by connecting these processors together:
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
     // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
     .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
@@ -180,15 +180,15 @@
     </pre>
 
     <p>
-        To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+        To take advantage of these state stores, developers can use the <code>Topology.addStateStore</code> method when building the
         processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
-        state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+        state store with the existing processor nodes through <code>Topology.connectProcessorAndStateStores</code>.
     </p>
 
     <pre class="brush: java;">
-    TopologyBuilder builder = new TopologyBuilder();
+    Topology topology = new Topology();
 
-    builder.addSource("SOURCE", "src-topic")
+    topology.addSource("SOURCE", "src-topic")
 
     .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
     // add the created state store "COUNTS" associated with processor "PROCESS1"
@@ -204,12 +204,32 @@
     .addSink("SINK3", "sink-topic3", "PROCESS3");
     </pre>
 
+    <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+
+    <p>
+        After a <code>Topology</code> is specified it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
+        A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
+        For source and sink nodes one can access the specified input/output topic name/pattern.
+        For processor nodes the attached stores are added to the description.
+        Additionally, all nodes have a list to all their connected successor and predecessor nodes.
+        Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
+        <br />
+        Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
+        Furthermore, nodes are grouped by <code>Subtopology</code>.
+        Subtopologies are groups of nodes that are directly connected to each other (i.e., either by a direct connection&mdash;but not a topic&mdash;or by sharing a store).
+        For execution, each <code>Subtopology</code> is executed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
+        Thus, each <code>Subtopology</code> describes an independent unit of works that can be executed by different threads in parallel.
+        <br />
+        Describing a <code>Topology</code> is helpful to reason about tasks and thus maximum parallelism.
+        It is also helpful to get insight into a <code>Topology</code> if it is not specified manually but via Kafka Streams DSL that is described in the next section.
+    </p>
+
     In the next section we present another way to build the processor topology: the Kafka Streams DSL.
     <br>
 
     <h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
 
-    To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
+    To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
     A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
     through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
     codes for details.
@@ -755,19 +775,19 @@
 
     <pre class="brush: java;">
           StreamsConfig config = ...;
-          TopologyBuilder builder = ...;
+          Topology topology = ...;
           ProcessorSupplier processorSuppler = ...;
 
           // Create CustomStoreSupplier for store name the-custom-store
           MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
           // Add the source topic
-          builder.addSource("input", "inputTopic");
+          topology.addSource("input", "inputTopic");
           // Add a custom processor that reads from the source topic
-          builder.addProcessor("the-processor", processorSupplier, "input");
+          topology.addProcessor("the-processor", processorSupplier, "input");
           // Connect your custom state store to the custom processor above
-          builder.addStateStore(customStoreSupplier, "the-processor");
+          topology.addStateStore(customStoreSupplier, "the-processor");
 
-          KafkaStreams streams = new KafkaStreams(builder, config);
+          KafkaStreams streams = new KafkaStreams(topology, config);
           streams.start();
 
           // Get access to the custom store
@@ -1053,33 +1073,36 @@
     </p>
 
     <p>
-        First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
-        builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
-        that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
+        First, you must create an instance of <code>KafkaStreams</code>.
+        The first argument of the <code>KafkaStreams</code> constructor takes a <code>Topology</code>
+        that is a logical topology description (you can create a <code>Topology</code> either directly or use
+        <code>StreamsBuilder</code> to create one).
+        The second argument is an instance of <code>StreamsConfig</code> mentioned above.
     </p>
 
     <pre class="brush: java;">
     import org.apache.kafka.streams.KafkaStreams;
     import org.apache.kafka.streams.StreamsConfig;
     import org.apache.kafka.streams.kstream.KStreamBuilder;
-    import org.apache.kafka.streams.processor.TopologyBuilder;
+    import org.apache.kafka.streams.Topology;
 
     // Use the builders to define the actual processing topology, e.g. to specify
     // from which input topics to read, which stream operations (filter, map, etc.)
     // should be called, and so on.
 
-    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+    Topology topology = ...; // when using the Processor API
     //
     // OR
     //
-    TopologyBuilder builder = ...; // when using the Processor API
+    KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+    Topology topology = builder.topology();
 
     // Use the configuration to tell your application where the Kafka cluster is,
     // which serializers/deserializers to use by default, to specify security settings,
     // and so on.
     StreamsConfig config = ...;
 
-    KafkaStreams streams = new KafkaStreams(builder, config);
+    KafkaStreams streams = new KafkaStreams(topology, config);
     </pre>
 
     <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 0ff42a7..34bb8bb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -19,15 +19,15 @@ package org.apache.kafka.streams.examples.wordcount;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -119,7 +119,7 @@ public class WordCountProcessorDemo {
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        TopologyBuilder builder = new TopologyBuilder();
+        Topology builder = new Topology();
 
         builder.addSource("Source", "streams-wordcount-input");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0d88efb..d7c608a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -121,7 +122,7 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
  * }</pre>
  *
  * @see KStreamBuilder
- * @see TopologyBuilder
+ * @see Topology
  */
 @InterfaceStability.Evolving
 public class KafkaStreams {
@@ -402,36 +403,72 @@ public class KafkaStreams {
     }
 
     /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final Properties props) {
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final StreamsConfig config) {
+        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier) {
+        this(builder.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder the processor topology builder specifying the computational logic
+     * @param topology the topology specifying the computational logic
      * @param props   properties for {@link StreamsConfig}
      */
-    public KafkaStreams(final TopologyBuilder builder, final Properties props) {
-        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    public KafkaStreams(final Topology topology,
+                        final Properties props) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder the processor topology builder specifying the computational logic
+     * @param topology the topology specifying the computational logic
      * @param config  the Kafka Streams configuration
      */
-    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) {
-        this(builder, config, new DefaultKafkaClientSupplier());
+    public KafkaStreams(final Topology topology,
+                        final StreamsConfig config) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
     }
 
     /**
      * Create a {@code KafkaStreams} instance.
      *
-     * @param builder        the processor topology builder specifying the computational logic
+     * @param topology       the topology specifying the computational logic
      * @param config         the Kafka Streams configuration
      * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
      *                       for the new {@code KafkaStreams} instance
      */
-    public KafkaStreams(final TopologyBuilder builder,
+    public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
+        this(topology.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+                         final StreamsConfig config,
+                         final KafkaClientSupplier clientSupplier) {
         // create the metrics
         final Time time = Time.SYSTEM;
 
@@ -442,7 +479,7 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always have value
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
-        builder.setApplicationId(applicationId);
+        internalTopologyBuilder.setApplicationId(applicationId);
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
@@ -466,16 +503,16 @@ public class KafkaStreams {
         GlobalStreamThread.State globalThreadState = null;
 
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
-        streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+        streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
-        final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
         }
 
         final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
-                (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
+            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
 
         stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
         if (globalTaskTopology != null) {
@@ -491,7 +528,7 @@ public class KafkaStreams {
         }
 
         for (int i = 0; i < threads.length; i++) {
-            threads[i] = new StreamThread(builder.internalTopologyBuilder,
+            threads[i] = new StreamThread(internalTopologyBuilder,
                                           config,
                                           clientSupplier,
                                           applicationId,
@@ -509,11 +546,11 @@ public class KafkaStreams {
         if (globalTaskTopology != null) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].setStateListener(streamStateListener);
+        for (StreamThread thread : threads) {
+            thread.setStateListener(streamStateListener);
         }
 
-        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
+        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
         final String cleanupThreadName = clientId + "-CleanupThread";
         stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
new file mode 100644
index 0000000..ca0ac75
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.regex.Pattern;
+
+/**
+ * A logical representation of a {@link ProcessorTopology}.
+ * A topology is an acyclic graph of sources, processors, and sinks.
+ * A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to its
+ * successor nodes.
+ * A {@link Processor processor} is a node in the graph that receives input records from upstream nodes, processes the
+ * records, and optionally forwarding new records to one or all of its downstream nodes.
+ * Finally, a {@link SinkNode sink} is a node in the graph that receives records from upstream nodes and writes them to
+ * a Kafka topic.
+ * A {@code Topology} allows you to construct an acyclic graph of these nodes, and then passed into a new
+ * {@link KafkaStreams} instance that will then {@link KafkaStreams#start() begin consuming, processing, and producing
+ * records}.
+ */
+public class Topology {
+
+    final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+
+    /**
+     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+     */
+    public enum AutoOffsetReset {
+        EARLIEST, LATEST
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this source if no committed offsets found;
+     *                           acceptable values earliest or latest
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forward the records to child processor
+     * and/or sink nodes.
+     * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy value for this source if no committed offsets found;
+     *                           acceptable values earliest or latest.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final TimestampExtractor timestampExtractor,
+                                           final String name,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest.
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by another source
+     */
+
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor timestampExtractor,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
+        return this;
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+     * and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valueDeserializer  value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return itself
+     * @throws TopologyException if processor is already added or if topics have already been registered by name
+     */
+    public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+                                           final String name,
+                                           final TimestampExtractor timestampExtractor,
+                                           final Deserializer keyDeserializer,
+                                           final Deserializer valueDeserializer,
+                                           final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException itself
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic,
+     * using the supplied partitioner.
+     * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
+     * the named Kafka topic's partitions.
+     * Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
+     * stores} in its processors.
+     * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+     * records among partitions using Kafka's default partitioning logic.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final StreamPartitioner partitioner,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final Serializer keySerializer,
+                                         final Serializer valueSerializer,
+                                         final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers, and the supplied partitioner.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its records
+     * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamsConfig stream configuration}
+     * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * and write to its topic
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final String topic,
+                                                final Serializer<K> keySerializer,
+                                                final Serializer<V> valueSerializer,
+                                                final StreamPartitioner<? super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new processor node that receives and processes records output by one or more parent source or processor
+     * node.
+     * Any new record output by this processor will be forwarded to its child processor or sink nodes.
+     *
+     * @param name the unique name of the processor node
+     * @param supplier the supplier used to obtain this node's {@link Processor} instance
+     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+     * and process
+     * @return itself
+     * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     */
+    public synchronized Topology addProcessor(final String name,
+                                              final ProcessorSupplier supplier,
+                                              final String... parentNames) {
+        internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+        return this;
+    }
+
+    /**
+     * Adds a state store.
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @return itself
+     * @throws TopologyException if state store supplier is already added
+     */
+    public synchronized Topology addStateStore(final StateStoreSupplier supplier,
+                                               final String... processorNames) {
+        internalTopologyBuilder.addStateStore(supplier, processorNames);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                final String sourceName,
+                                                final Deserializer keyDeserializer,
+                                                final Deserializer valueDeserializer,
+                                                final String topic,
+                                                final String processorName,
+                                                final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                final String sourceName,
+                                                final TimestampExtractor timestampExtractor,
+                                                final Deserializer keyDeserializer,
+                                                final Deserializer valueDeserializer,
+                                                final String topic,
+                                                final String processorName,
+                                                final ProcessorSupplier stateUpdateSupplier) {
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+            valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
+    }
+
+    /**
+     * Connects the processor and the state stores.
+     *
+     * @param processorName the name of the processor
+     * @param stateStoreNames the names of state stores that the processor uses
+     * @return itself
+     * @throws TopologyException if the processor or a state store is unknown
+     */
+    public synchronized Topology connectProcessorAndStateStores(final String processorName,
+                                                                final String... stateStoreNames) {
+        internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+        return this;
+    }
+
+    public synchronized TopologyDescription describe() {
+        return internalTopologyBuilder.describe();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index dd481ff..725b7b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -35,8 +35,9 @@ public interface TopologyDescription {
     /**
      * A connected sub-graph of a {@link Topology}.
      * <p>
-     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
-     * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String,
+     * org.apache.kafka.streams.processor.ProcessorSupplier, String...) directly} or indirectly via
+     * {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
      * (i.e., if multiple processors share the same state).
      */
     interface Subtopology {
@@ -54,7 +55,7 @@ public interface TopologyDescription {
     }
 
     /**
-     * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+     * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
      * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
      * String, ProcessorSupplier)} global store}.
      * Adding a global store results in adding a source node and one stateful processor node.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
index b9c0c3a..385d401 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.errors;
 
 
 /**
- * Indicates a pre-run time error incurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
+ * Indicates a pre-run time error occurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
  * builder} to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology processor topology}.
+ *
+ * @deprecated use {@link org.apache.kafka.streams.Topology} instead of {@link org.apache.kafka.streams.processor.TopologyBuilder}
  */
+@Deprecated
 public class TopologyBuilderException extends StreamsException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
new file mode 100644
index 0000000..1eaef06
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+/**
+ * Indicates a pre run time error occurred while parsing the {@link org.apache.kafka.streams.Topology logical topology}
+ * to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology physical processor topology}.
+ */
+public class TopologyException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public TopologyException(final String message) {
+        super("Invalid topology" + (message == null ? "" : ": " + message));
+    }
+
+    public TopologyException(final String message,
+                             final Throwable throwable) {
+        super("Invalid topology" + (message == null ? "" : ": " + message), throwable);
+    }
+
+    public TopologyException(final Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 191931b..535e1e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,13 +22,13 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
 import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 
 /**
  * {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
  * <p>
  * A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
  * {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
- * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link TopologyBuilder}) via
+ * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via
  * {@link #process(ProcessorSupplier, String...) process(...)},
  * {@link #transform(TransformerSupplier, String...) transform(...)}, and
  * {@link #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 93cf410..0afadbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -59,6 +59,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
                 context().forward(pair.key, pair.value);
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public void punctuate(long timestamp) {
             KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index a6e9aaf..ab1c302 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -105,6 +105,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                         return context.schedule(interval, type, callback);
                     }
 
+                    @SuppressWarnings("deprecation")
                     @Override
                     public void schedule(final long interval) {
                         context.schedule(interval);
@@ -168,6 +169,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
             context.forward(key, valueTransformer.transform(value));
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public void punctuate(long timestamp) {
             if (valueTransformer.punctuate(timestamp) != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 679efe5..048670e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -238,22 +238,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print() {
         print(null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(String label) {
         print(null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
         print(keySerde, valSerde, this.name);
     }
 
-
+    @SuppressWarnings("deprecation")
     @Override
     public void print(Serde<K> keySerde, final Serde<V> valSerde, String label) {
         Objects.requireNonNull(label, "label can't be null");
@@ -261,16 +264,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath) {
         writeAsText(filePath, this.name, null, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, String label) {
         writeAsText(filePath, label, null, null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
         writeAsText(filePath, this.name, keySerde, valSerde);
@@ -279,6 +285,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     /**
      * @throws TopologyBuilderException if file is not found
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
@@ -296,6 +303,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 49b3c18..1cfe78a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -44,6 +44,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
      *
      * @param timestamp the wallclock time when this method is being called
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void punctuate(long timestamp) {
         // do nothing


Mime
View raw message