kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
Date Mon, 24 Jul 2017 18:03:33 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 4508c77..ce6ba7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -19,39 +19,25 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-
 /**
  * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
  * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
@@ -64,238 +50,30 @@ import java.util.regex.Pattern;
 @InterfaceStability.Evolving
 public class TopologyBuilder {
 
-    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
-
-    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
-
-    // node factories in a topological order
-    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
-
-    // state factories
-    private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
-
-    // global state factories
-    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
-
-    // all topics subscribed from source processors (without application-id prefix for internal topics)
-    private final Set<String> sourceTopicNames = new HashSet<>();
-
-    // all internal topics auto-created by the topology builder and used in source / sink processors
-    private final Set<String> internalTopicNames = new HashSet<>();
-
-    // groups of source processors that need to be copartitioned
-    private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
-
-    // map from source processor names to subscribed topics (without application-id prefix for internal topics)
-    private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();
-
-    // map from source processor names to regex subscription patterns
-    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
-
-    // map from sink processor names to subscribed topic (without application-id prefix for internal topics)
-    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
-
-    // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
-    // even if it can be matched by multiple regex patterns
-    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
-
-    // map from state store names to all the topics subscribed from source processors that
-    // are connected to these state stores
-    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
-
-    // map from state store names to all the regex subscribed topics from source processors that
-    // are connected to these state stores
-    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();
-
-    // map from state store names to this state store's corresponding changelog topic if possible,
-    // this is used in the extended KStreamBuilder.
-    private final Map<String, String> storeToChangelogTopic = new HashMap<>();
-
-    // all global topics
-    private final Set<String> globalTopics = new HashSet<>();
-
-    private final Set<String> earliestResetTopics = new HashSet<>();
-
-    private final Set<String> latestResetTopics = new HashSet<>();
-
-    private final Set<Pattern> earliestResetPatterns = new HashSet<>();
-
-    private final Set<Pattern> latestResetPatterns = new HashSet<>();
-
-    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
-
-    private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-
-    private String applicationId = null;
-
-    private Pattern topicPattern = null;
-
-    private Map<Integer, Set<String>> nodeGroups = null;
-
-    private static class StateStoreFactory {
-        public final Set<String> users;
-
-        public final StateStoreSupplier supplier;
-
-        StateStoreFactory(StateStoreSupplier supplier) {
-            this.supplier = supplier;
-            this.users = new HashSet<>();
-        }
-    }
-
-    private static abstract class NodeFactory {
-        final String name;
-        final String[] parents;
-
-        NodeFactory(final String name, final String[] parents) {
-            this.name = name;
-            this.parents = parents;
-        }
-
-        public abstract ProcessorNode build();
-
-        abstract TopologyDescription.AbstractNode describe();
-    }
-
-    private static class ProcessorNodeFactory extends NodeFactory {
-        private final ProcessorSupplier<?, ?> supplier;
-        private final Set<String> stateStoreNames = new HashSet<>();
-
-        ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
-            super(name, parents.clone());
-            this.supplier = supplier;
-        }
-
-        public void addStateStore(String stateStoreName) {
-            stateStoreNames.add(stateStoreName);
-        }
-
-        @Override
-        public ProcessorNode build() {
-            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
-        }
-
-        @Override
-        TopologyDescription.Processor describe() {
-            return new TopologyDescription.Processor(name, new HashSet<>(stateStoreNames));
-        }
-    }
-
-    private class SourceNodeFactory extends NodeFactory {
-        private final List<String> topics;
-        private final Pattern pattern;
-        private final Deserializer<?> keyDeserializer;
-        private final Deserializer<?> valDeserializer;
-        private final TimestampExtractor timestampExtractor;
-
-        private SourceNodeFactory(final String name,
-                                  final String[] topics,
-                                  final Pattern pattern,
-                                  final TimestampExtractor timestampExtractor,
-                                  final Deserializer<?> keyDeserializer,
-                                  final Deserializer<?> valDeserializer) {
-            super(name, new String[0]);
-            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
-            this.pattern = pattern;
-            this.keyDeserializer = keyDeserializer;
-            this.valDeserializer = valDeserializer;
-            this.timestampExtractor = timestampExtractor;
-        }
-
-        List<String> getTopics(Collection<String> subscribedTopics) {
-            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
-            if (subscribedTopics.isEmpty())
-                return Collections.singletonList("" + pattern + "");
-
-            List<String> matchedTopics = new ArrayList<>();
-            for (String update : subscribedTopics) {
-                if (this.pattern == topicToPatterns.get(update)) {
-                    matchedTopics.add(update);
-                } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
-                    // the same topic cannot be matched to more than one pattern
-                    // TODO: we should lift this requirement in the future
-                    throw new TopologyBuilderException("Topic " + update +
-                            " is already matched for another regex pattern " + topicToPatterns.get(update) +
-                            " and hence cannot be matched to this regex pattern " + pattern + " any more.");
-                } else if (isMatch(update)) {
-                    topicToPatterns.put(update, this.pattern);
-                    matchedTopics.add(update);
-                }
-            }
-            return matchedTopics;
-        }
-
-        @Override
-        public ProcessorNode build() {
-            final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
-            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
-            if (sourceTopics == null)
-                return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
-            else
-                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
-        }
-
-        private boolean isMatch(String topic) {
-            return this.pattern.matcher(topic).matches();
-        }
-
-        @Override
-        TopologyDescription.Source describe() {
-            String sourceTopics;
-
-            if (pattern == null) {
-                sourceTopics = topics.toString();
-                sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
-            } else {
-                sourceTopics = pattern.toString();
-            }
-
-            return new TopologyDescription.Source(name, sourceTopics);
-        }
-    }
-
-    private class SinkNodeFactory<K, V> extends NodeFactory {
-        private final String topic;
-        private final Serializer<K> keySerializer;
-        private final Serializer<V> valSerializer;
-        private final StreamPartitioner<? super K, ? super V> partitioner;
-
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
-            super(name, parents.clone());
-            this.topic = topic;
-            this.keySerializer = keySerializer;
-            this.valSerializer = valSerializer;
-            this.partitioner = partitioner;
-        }
-
-        @Override
-        public ProcessorNode build() {
-            if (internalTopicNames.contains(topic)) {
-                // prefix the internal topic name with the application id
-                return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
-            } else {
-                return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
-            }
-        }
-
-        @Override
-        TopologyDescription.Sink describe() {
-            return new TopologyDescription.Sink(name, topic);
-        }
-    }
+    /**
+     * NOTE this member would not needed by developers working with the processor APIs, but only used
+     * for internal functionalities.
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
+    public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
 
+    /**
+     * NOTE this class would not needed by developers working with the processor APIs, but only used
+     * for internal functionalities.
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
         public Map<String, InternalTopicConfig> stateChangelogTopics;
         public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
-        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+        public TopicsInfo(final Set<String> sinkTopics,
+                          final Set<String> sourceTopics,
+                          final Map<String, InternalTopicConfig> repartitionSourceTopics,
+                          final Map<String, InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
@@ -303,10 +81,10 @@ public class TopologyBuilder {
         }
 
         @Override
-        public boolean equals(Object o) {
+        public boolean equals(final Object o) {
             if (o instanceof TopicsInfo) {
-                TopicsInfo other = (TopicsInfo) o;
-                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
+                final TopicsInfo other = (TopicsInfo) o;
+                return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
             } else {
                 return false;
             }
@@ -314,7 +92,7 @@ public class TopologyBuilder {
 
         @Override
         public int hashCode() {
-            long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+            final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
             return (int) (n % 0xFFFFFFFFL);
         }
 
@@ -341,19 +119,10 @@ public class TopologyBuilder {
      */
     public TopologyBuilder() {}
 
-    /**
-     * Set the applicationId to be used for auto-generated internal topics.
-     *
-     * This is required before calling {@link #topicGroups}, {@link #copartitionSources},
-     * {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}.
-     *
-     * @param applicationId the streams applicationId. Should be the same as set by
-     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
-     */
+    /** @deprecated This class is not part of public API and should never be used by a developer. */
+    @Deprecated
     public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
-        Objects.requireNonNull(applicationId, "applicationId can't be null");
-        this.applicationId = applicationId;
-
+        internalTopologyBuilder.setApplicationId(applicationId);
         return this;
     }
 
@@ -369,8 +138,10 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final String name, final String... topics) {
-        return addSource(null, name, null, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        return this;
     }
 
     /**
@@ -386,9 +157,11 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) {
-        return addSource(offsetReset, name, null, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final String name,
+                                                        final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+        return this;
     }
 
     /**
@@ -404,8 +177,10 @@ public class TopologyBuilder {
      * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics) {
-        return addSource(null, name, timestampExtractor, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
+                                                        final String name, final String... topics) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+        return this;
     }
 
     /**
@@ -423,8 +198,10 @@ public class TopologyBuilder {
      * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) {
-        return addSource(offsetReset, name, timestampExtractor, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final TimestampExtractor timestampExtractor, final String name, final String... topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+        return this;
     }
 
     /**
@@ -440,9 +217,10 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-
-    public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) {
-        return addSource(null, name, null, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        return this;
     }
 
     /**
@@ -459,9 +237,11 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) {
-        return addSource(offsetReset, name, null, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final String name,
+                                                        final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+        return this;
     }
 
 
@@ -479,8 +259,11 @@ public class TopologyBuilder {
      * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final   Pattern topicPattern) {
-        return addSource(null, name, timestampExtractor, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        return this;
     }
 
 
@@ -500,8 +283,12 @@ public class TopologyBuilder {
      * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) {
-        return addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+        return this;
     }
 
 
@@ -521,8 +308,12 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
 
-    public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
-        return addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Deserializer keyDeserializer,
+                                                        final Deserializer valDeserializer,
+                                                        final String... topics) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+        return this;
     }
 
     /**
@@ -550,24 +341,7 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final String... topics) {
-        if (topics.length == 0) {
-            throw new TopologyBuilderException("You must provide at least one topic");
-        }
-        Objects.requireNonNull(name, "name must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
-
-        for (String topic : topics) {
-            Objects.requireNonNull(topic, "topic names cannot be null");
-            validateTopicNotAlreadyRegistered(topic);
-            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
-            sourceTopicNames.add(topic);
-        }
-
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
-        nodeToSourceTopics.put(name, Arrays.asList(topics));
-        nodeGrouper.add(name);
-
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
         return this;
     }
 
@@ -600,11 +374,10 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        return addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        return this;
     }
 
-
-
     /**
      * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
      * from all partitions of the provided input topic. There will be exactly one instance of this
@@ -636,58 +409,8 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
-        Objects.requireNonNull(processorName, "processorName must not be null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
-        }
-        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
-        }
-        if (storeSupplier.loggingEnabled()) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyBuilderException("sourceName and processorName must be different.");
-        }
-
-        validateTopicNotAlreadyRegistered(topic);
-
-        globalTopics.add(topic);
-        final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
-
-        final String[] parents = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
-        nodeFactory.addStateStore(storeSupplier.name());
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, parents);
-
-        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
         return this;
-
-    }
-
-    private void validateTopicNotAlreadyRegistered(final String topic) {
-        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
-            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
-        }
-
-        for (Pattern pattern : nodeToSourcePatterns.values()) {
-            if (pattern.matcher(topic).matches()) {
-                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
-            }
-        }
     }
 
     /**
@@ -708,9 +431,12 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
-
-    public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
-        return addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Deserializer keyDeserializer,
+                                                        final Deserializer valDeserializer,
+                                                        final Pattern topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+        return this;
     }
 
     /**
@@ -734,36 +460,16 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final TimestampExtractor timestampExtractor,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
-        Objects.requireNonNull(name, "name can't be null");
-
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
-        }
-
-        for (String sourceTopicName : sourceTopicNames) {
-            if (topicPattern.matcher(sourceTopicName).matches()) {
-                throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
-            }
-        }
-
-        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
-
-        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
-        nodeToSourcePatterns.put(name, topicPattern);
-        nodeGrouper.add(name);
-
+        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
         return this;
     }
 
-
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forwards the records to child processor and/or sink nodes.
@@ -783,37 +489,40 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        return addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+        return this;
     }
 
 
     /**
-     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its records
-     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final String topic, final String... parentNames) {
-        return addSink(name, topic, null, null, parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final String... predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
+     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic, using
      * the supplied partitioner.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
@@ -828,19 +537,23 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its records
      * @param partitioner the function that should be used to determine the partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final String topic, final StreamPartitioner partitioner, final String... parentNames) {
-        return addSink(name, topic, null, null, partitioner, parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final StreamPartitioner partitioner,
+                                                      final String... predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      *
      * @param name the unique name of the sink
@@ -851,19 +564,24 @@ public class TopologyBuilder {
      * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
      * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final String topic, final Serializer keySerializer, final Serializer valSerializer, final String... parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, null, parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final Serializer keySerializer,
+                                                      final Serializer valSerializer,
+                                                      final String... predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
      * The sink will use the specified key and value serializers, and the supplied partitioner.
      *
      * @param name the unique name of the sink
@@ -875,66 +593,41 @@ public class TopologyBuilder {
      * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
-     */
-    public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
-
-        for (final String parent : parentNames) {
-            if (parent.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
-            }
-            if (!nodeFactories.containsKey(parent)) {
-                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
-            }
-        }
-
-        nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, topic, keySerializer, valSerializer, partitioner));
-        nodeToSinkTopic.put(name, topic);
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, parentNames);
+     * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
+     */
+    public synchronized final <K, V> TopologyBuilder addSink(final String name,
+                                                             final String topic,
+                                                             final Serializer<K> keySerializer,
+                                                             final Serializer<V> valSerializer,
+                                                             final StreamPartitioner<? super K, ? super V> partitioner,
+                                                             final String... predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
         return this;
     }
 
     /**
-     * Add a new processor node that receives and processes records output by one or more parent source or processor node.
+     * Add a new processor node that receives and processes records output by one or more predecessor source or processor node.
      * Any new record output by this processor will be forwarded to its child processor or sink nodes.
      * @param name the unique name of the processor node
      * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+     * @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never null
-     * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+     * @throws TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
      */
-    public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(supplier, "supplier must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
-
-        for (final String parent : parentNames) {
-            if (parent.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
-            }
-            if (!nodeFactories.containsKey(parent)) {
-                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
-            }
-        }
-
-        nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, parentNames);
+    public synchronized final TopologyBuilder addProcessor(final String name,
+                                                           final ProcessorSupplier supplier,
+                                                           final String... predecessorNames) {
+        internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
         return this;
     }
+
     /**
      * Adds a state store
      *
@@ -942,20 +635,9 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if state store supplier is already added
      */
-    public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier, final String... processorNames) {
-        Objects.requireNonNull(supplier, "supplier can't be null");
-        if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
-        }
-
-        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
-
-        if (processorNames != null) {
-            for (String processorName : processorNames) {
-                connectProcessorAndStateStore(processorName, supplier.name());
-            }
-        }
-
+    public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
+                                                            final String... processorNames) {
+        internalTopologyBuilder.addStateStore(supplier, processorNames);
         return this;
     }
 
@@ -966,26 +648,25 @@ public class TopologyBuilder {
      * @param stateStoreNames the names of state stores that the processor uses
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) {
-        Objects.requireNonNull(processorName, "processorName can't be null");
-        if (stateStoreNames != null) {
-            for (String stateStoreName : stateStoreNames) {
-                connectProcessorAndStateStore(processorName, stateStoreName);
-            }
-        }
-
+    public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
+                                                                             final String... stateStoreNames) {
+        internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
         return this;
     }
 
     /**
      * This is used only for KStreamBuilder: when adding a KTable from a source topic,
      * we need to add the topic as the KTable's materialized state store's changelog.
+     *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
      */
-    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName, final String topic) {
-        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
-            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
-        }
-        storeToChangelogTopic.put(sourceStoreName, topic);
+    @Deprecated
+    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
+                                                                            final String topic) {
+        internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
         return this;
     }
 
@@ -998,678 +679,206 @@ public class TopologyBuilder {
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
-        if (processorNames.length < 2)
-            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
-
-        for (String processorName : processorNames) {
-            if (!nodeFactories.containsKey(processorName))
-                throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
-
-        }
-
-        String firstProcessorName = processorNames[0];
-
-        nodeGrouper.unite(firstProcessorName, Arrays.copyOfRange(processorNames, 1, processorNames.length));
-
+        internalTopologyBuilder.connectProcessors(processorNames);
         return this;
     }
 
     /**
      * Adds an internal topic
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
-        Objects.requireNonNull(topicName, "topicName can't be null");
-        this.internalTopicNames.add(topicName);
-
+        internalTopologyBuilder.addInternalTopic(topicName);
         return this;
     }
 
     /**
      * Asserts that the streams of the specified source nodes must be copartitioned.
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
-        copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+        internalTopologyBuilder.copartitionSources(sourceNodes);
         return this;
     }
 
-    private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) {
-        if (!stateFactories.containsKey(stateStoreName))
-            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
-        if (!nodeFactories.containsKey(processorName))
-            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
-
-        final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
-        final Iterator<String> iter = stateStoreFactory.users.iterator();
-        if (iter.hasNext()) {
-            final String user = iter.next();
-            nodeGrouper.unite(user, processorName);
-        }
-        stateStoreFactory.users.add(processorName);
-
-        NodeFactory nodeFactory = nodeFactories.get(processorName);
-        if (nodeFactory instanceof ProcessorNodeFactory) {
-            final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
-            processorNodeFactory.addStateStore(stateStoreName);
-            connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
-        } else {
-            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
-        }
-    }
-
-    private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents) {
-        final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
-        for (String parent : parents) {
-            final NodeFactory nodeFactory = nodeFactories.get(parent);
-            if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory) nodeFactory);
-            } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
-            }
-        }
-        return sourceNodes;
-    }
-
-    private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
-                                                              final ProcessorNodeFactory processorNodeFactory) {
-
-        // we should never update the mapping from state store names to source topics if the store name already exists
-        // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
-        // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
-
-        if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
-            return;
-        }
-
-        final Set<String> sourceTopics = new HashSet<>();
-        final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents);
-
-        for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
-            if (sourceNodeFactory.pattern != null) {
-                sourcePatterns.add(sourceNodeFactory.pattern);
-            } else {
-                sourceTopics.addAll(sourceNodeFactory.topics);
-            }
-        }
-        
-        if (!sourceTopics.isEmpty()) {
-            stateStoreNameToSourceTopics.put(stateStoreName,
-                    Collections.unmodifiableSet(sourceTopics));
-        }
-
-        if (!sourcePatterns.isEmpty()) {
-            stateStoreNameToSourceRegex.put(stateStoreName,
-                    Collections.unmodifiableSet(sourcePatterns));
-        }
-
-    }
-
-
-    private <T> void maybeAddToResetList(final Collection<T> earliestResets, final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) {
-        if (offsetReset != null) {
-            switch (offsetReset) {
-                case EARLIEST:
-                    earliestResets.add(item);
-                    break;
-                case LATEST:
-                    latestResets.add(item);
-                    break;
-                default:
-                    throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
-            }
-        }
-    }
-
     /**
      * Returns the map of node groups keyed by the topic group id.
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of node names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Map<Integer, Set<String>> nodeGroups() {
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        return nodeGroups;
-    }
-
-    private Map<Integer, Set<String>> makeNodeGroups() {
-        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
-        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
-
-        int nodeGroupId = 0;
-
-        // Go through source nodes first. This makes the group id assignment easy to predict in tests
-        final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
-        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
-
-        for (String nodeName : Utils.sorted(allSourceNodes)) {
-            final String root = nodeGrouper.root(nodeName);
-            Set<String> nodeGroup = rootToNodeGroup.get(root);
-            if (nodeGroup == null) {
-                nodeGroup = new HashSet<>();
-                rootToNodeGroup.put(root, nodeGroup);
-                nodeGroups.put(nodeGroupId++, nodeGroup);
-            }
-            nodeGroup.add(nodeName);
-        }
-
-        // Go through non-source nodes
-        for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToSourceTopics.containsKey(nodeName)) {
-                final String root = nodeGrouper.root(nodeName);
-                Set<String> nodeGroup = rootToNodeGroup.get(root);
-                if (nodeGroup == null) {
-                    nodeGroup = new HashSet<>();
-                    rootToNodeGroup.put(root, nodeGroup);
-                    nodeGroups.put(nodeGroupId++, nodeGroup);
-                }
-                nodeGroup.add(nodeName);
-            }
-        }
-
-        return nodeGroups;
+        return internalTopologyBuilder.nodeGroups();
     }
 
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
      * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized ProcessorTopology build(final Integer topicGroupId) {
-        Set<String> nodeGroup;
-        if (topicGroupId != null) {
-            nodeGroup = nodeGroups().get(topicGroupId);
-        } else {
-            // when topicGroupId is null, we build the full topology minus the global groups
-            final Set<String> globalNodeGroups = globalNodeGroups();
-            final Collection<Set<String>> values = nodeGroups().values();
-            nodeGroup = new HashSet<>();
-            for (Set<String> value : values) {
-                nodeGroup.addAll(value);
-            }
-            nodeGroup.removeAll(globalNodeGroups);
-
-
-        }
-        return build(nodeGroup);
+        return internalTopologyBuilder.build(topicGroupId);
     }
 
     /**
      * Builds the topology for any global state stores
+     *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return ProcessorTopology
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized ProcessorTopology buildGlobalStateTopology() {
-        final Set<String> globalGroups = globalNodeGroups();
-        if (globalGroups.isEmpty()) {
-            return null;
-        }
-        return build(globalGroups);
-    }
-
-    private Set<String> globalNodeGroups() {
-        final Set<String> globalGroups = new HashSet<>();
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
-            final Set<String> nodes = nodeGroup.getValue();
-            for (String node : nodes) {
-                if (isGlobalSource(node)) {
-                    globalGroups.addAll(nodes);
-                }
-            }
-        }
-        return globalGroups;
-    }
-
-    private ProcessorTopology build(final Set<String> nodeGroup) {
-        final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
-        final Map<String, ProcessorNode> processorMap = new HashMap<>();
-        final Map<String, SourceNode> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode> topicSinkMap = new HashMap<>();
-        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
-
-        // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
-        for (NodeFactory factory : nodeFactories.values()) {
-            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode node = factory.build();
-                processorNodes.add(node);
-                processorMap.put(node.name(), node);
-
-                if (factory instanceof ProcessorNodeFactory) {
-                    for (String parent : ((ProcessorNodeFactory) factory).parents) {
-                        final ProcessorNode<?, ?> parentNode = processorMap.get(parent);
-                        parentNode.addChild(node);
-                    }
-                    for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
-                        if (!stateStoreMap.containsKey(stateStoreName)) {
-                            StateStore stateStore;
-
-                            if (stateFactories.containsKey(stateStoreName)) {
-                                final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
-                                stateStore = supplier.get();
-
-                                // remember the changelog topic if this state store is change-logging enabled
-                                if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
-                                    final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
-                                    storeToChangelogTopic.put(stateStoreName, changelogTopic);
-                                }
-                            } else {
-                                stateStore = globalStateStores.get(stateStoreName);
-                            }
-
-                            stateStoreMap.put(stateStoreName, stateStore);
-                        }
-                    }
-                } else if (factory instanceof SourceNodeFactory) {
-                    final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
-                    final List<String> topics = (sourceNodeFactory.pattern != null) ?
-                            sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
-                            sourceNodeFactory.topics;
-
-                    for (String topic : topics) {
-                        if (internalTopicNames.contains(topic)) {
-                            // prefix the internal topic name with the application id
-                            topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
-                        } else {
-                            topicSourceMap.put(topic, (SourceNode) node);
-                        }
-                    }
-                } else if (factory instanceof SinkNodeFactory) {
-                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
-
-                    for (String parent : sinkNodeFactory.parents) {
-                        processorMap.get(parent).addChild(node);
-                        if (internalTopicNames.contains(sinkNodeFactory.topic)) {
-                            // prefix the internal topic name with the application id
-                            topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
-                        } else {
-                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
-                        }
-                    }
-                } else {
-                    throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
-                }
-            }
-        }
-
-        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
+        return internalTopologyBuilder.buildGlobalStateTopology();
     }
 
     /**
      * Get any global {@link StateStore}s that are part of the
      * topology
+     *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return map containing all global {@link StateStore}s
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public Map<String, StateStore> globalStateStores() {
-        return Collections.unmodifiableMap(globalStateStores);
+        return internalTopologyBuilder.globalStateStores();
     }
 
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of topic names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
-
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            final Set<String> sinkTopics = new HashSet<>();
-            final Set<String> sourceTopics = new HashSet<>();
-            final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
-            final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
-            for (String node : entry.getValue()) {
-                // if the node is a source node, add to the source topics
-                final List<String> topics = nodeToSourceTopics.get(node);
-                if (topics != null) {
-                    // if some of the topics are internal, add them to the internal topics
-                    for (String topic : topics) {
-                        // skip global topic as they don't need partition assignment
-                        if (globalTopics.contains(topic)) {
-                            continue;
-                        }
-                        if (this.internalTopicNames.contains(topic)) {
-                            // prefix the internal topic name with the application id
-                            final String internalTopic = decorateTopic(topic);
-                            internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
-                                                                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                                            Collections.<String, String>emptyMap()));
-                            sourceTopics.add(internalTopic);
-                        } else {
-                            sourceTopics.add(topic);
-                        }
-                    }
-                }
-
-                // if the node is a sink node, add to the sink topics
-                final String topic = nodeToSinkTopic.get(node);
-                if (topic != null) {
-                    if (internalTopicNames.contains(topic)) {
-                        // prefix the change log topic name with the application id
-                        sinkTopics.add(decorateTopic(topic));
-                    } else {
-                        sinkTopics.add(topic);
-                    }
-                }
-
-                // if the node is connected to a state, add to the state topics
-                for (StateStoreFactory stateFactory : stateFactories.values()) {
-                    final StateStoreSupplier supplier = stateFactory.supplier;
-                    if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
-                        final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
-                        final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
-                        stateChangelogTopics.put(name, internalTopicConfig);
-                    }
-                }
-            }
-            if (!sourceTopics.isEmpty()) {
-                topicGroups.put(entry.getKey(), new TopicsInfo(
-                        Collections.unmodifiableSet(sinkTopics),
-                        Collections.unmodifiableSet(sourceTopics),
-                        Collections.unmodifiableMap(internalSourceTopics),
-                        Collections.unmodifiableMap(stateChangelogTopics)));
-            }
-        }
-
-        return Collections.unmodifiableMap(topicGroups);
-    }
-
-    private void setRegexMatchedTopicsToSourceNodes() {
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
-                final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
-                //need to update nodeToSourceTopics with topics matched from given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
-                log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
-            }
-        }
-    }
-
-    private void setRegexMatchedTopicToStateStore() {
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
-                final Set<String> updatedTopicsForStateStore = new HashSet<>();
-                for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
-                    for (Pattern pattern : storePattern.getValue()) {
-                        if (pattern.matcher(subscriptionUpdateTopic).matches()) {
-                            updatedTopicsForStateStore.add(subscriptionUpdateTopic);
-                        }
-                    }
-                }
-                if (!updatedTopicsForStateStore.isEmpty()) {
-                    Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
-                    if (storeTopics != null) {
-                        updatedTopicsForStateStore.addAll(storeTopics);
-                    }
-                    stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
-                }
-            }
-        }
-    }
-    
-    private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, final String name) {
-        if (!(supplier instanceof WindowStoreSupplier)) {
-            return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
-        }
-
-        final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
-        final InternalTopicConfig config = new InternalTopicConfig(name,
-                                                                   Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                                           InternalTopicConfig.CleanupPolicy.delete),
-                                                                   supplier.logConfig());
-        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
-        return config;
+        return internalTopologyBuilder.topicGroups();
     }
 
     /**
      * Get the Pattern to match all topics requiring to start reading from earliest available offset
+     *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return the Pattern for matching all topics reading from earliest offset, never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Pattern earliestResetTopicsPattern() {
-        final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
-        final Pattern earliestPattern =  buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
-
-        ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
-
-        return earliestPattern;
+        return internalTopologyBuilder.earliestResetTopicsPattern();
     }
 
     /**
      * Get the Pattern to match all topics requiring to start reading from latest available offset
+     *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return the Pattern for matching all topics reading from latest offset, never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Pattern latestResetTopicsPattern() {
-        final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics);
-        final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns);
-
-        ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics);
-
-        return  latestPattern;
-    }
-
-    private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern> otherPatterns, final Set<String> otherTopics) {
-
-        for (Pattern otherPattern : otherPatterns) {
-            if (builtPattern.pattern().contains(otherPattern.pattern())) {
-                throw new TopologyBuilderException(String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets", otherPattern.pattern(), builtPattern.pattern()));
-            }
-        }
-
-        for (String otherTopic : otherTopics) {
-            if (builtPattern.matcher(otherTopic).matches()) {
-                throw new TopologyBuilderException(String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", builtPattern.pattern(), otherTopic));
-            }
-        }
+        return internalTopologyBuilder.latestResetTopicsPattern();
     }
 
     /**
-     * Builds a composite pattern out of topic names and Pattern object for matching topic names.  If the provided
-     * arrays are empty a Pattern.compile("") instance is returned.
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
      *
-     * @param sourceTopics  the name of source topics to add to a composite pattern
-     * @param sourcePatterns Patterns for matching source topics to add to a composite pattern
-     * @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics
-     */
-    private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics, final Collection<Pattern> sourcePatterns) {
-        final StringBuilder builder = new StringBuilder();
-
-        for (String topic : sourceTopics) {
-            builder.append(topic).append("|");
-        }
-
-        for (Pattern sourcePattern : sourcePatterns) {
-            builder.append(sourcePattern.pattern()).append("|");
-        }
-
-        if (builder.length() > 0) {
-            builder.setLength(builder.length() - 1);
-            return Pattern.compile(builder.toString());
-        }
-
-        return EMPTY_ZERO_LENGTH_PATTERN;
-    }
-
-    /**
      * @return a mapping from state store name to a Set of source Topics.
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public Map<String, List<String>> stateStoreNameToSourceTopics() {
-        final Map<String, List<String>> results = new HashMap<>();
-        for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
-            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
-        }
-        return results;
+        return internalTopologyBuilder.stateStoreNameToSourceTopics();
     }
 
     /**
      * Returns the copartition groups.
      * A copartition group is a group of source topics that are required to be copartitioned.
      *
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of topic names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Collection<Set<String>> copartitionGroups() {
-        final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
-        for (Set<String> nodeNames : copartitionSourceGroups) {
-            Set<String> copartitionGroup = new HashSet<>();
-            for (String node : nodeNames) {
-                final List<String> topics = nodeToSourceTopics.get(node);
-                if (topics != null)
-                    copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
-            }
-            list.add(Collections.unmodifiableSet(copartitionGroup));
-        }
-        return Collections.unmodifiableList(list);
-    }
-
-    private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
-        final List<String> decoratedTopics = new ArrayList<>();
-        for (String topic : sourceTopics) {
-            if (internalTopicNames.contains(topic)) {
-                decoratedTopics.add(decorateTopic(topic));
-            } else {
-                decoratedTopics.add(topic);
-            }
-        }
-        return decoratedTopics;
-    }
-
-    private String decorateTopic(final String topic) {
-        if (applicationId == null) {
-            throw new TopologyBuilderException("there are internal topics and "
-                    + "applicationId hasn't been set. Call "
-                    + "setApplicationId first");
-        }
-
-        return applicationId + "-" + topic;
+        return internalTopologyBuilder.copartitionGroups();
     }
 
+    /**
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public SubscriptionUpdates subscriptionUpdates() {
-        return subscriptionUpdates;
+        return internalTopologyBuilder.subscriptionUpdates();
     }
 
+    /**
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public synchronized Pattern sourceTopicPattern() {
-        if (this.topicPattern == null) {
-            final List<String> allSourceTopics = new ArrayList<>();
-            if (!nodeToSourceTopics.isEmpty()) {
-                for (List<String> topics : nodeToSourceTopics.values()) {
-                    allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
-                }
-            }
-            Collections.sort(allSourceTopics);
-
-            this.topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values());
-        }
-
-        return this.topicPattern;
-    }
-
-    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, final String threadId) {
-        log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates);
-        this.subscriptionUpdates = subscriptionUpdates;
-        setRegexMatchedTopicsToSourceNodes();
-        setRegexMatchedTopicToStateStore();
-    }
-
-    private boolean isGlobalSource(final String nodeName) {
-        final NodeFactory nodeFactory = nodeFactories.get(nodeName);
-
-        if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
-            if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    TopologyDescription describe() {
-        final TopologyDescription description = new TopologyDescription();
-
-        describeSubtopologies(description);
-        describeGlobalStores(description);
-
-        return description;
-    }
-
-    private void describeSubtopologies(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
-
-            final Set<String> allNodesOfGroups = nodeGroup.getValue();
-            final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
-
-            if (!isNodeGroupOfGlobalStores) {
-                describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
-            }
-        }
+        return internalTopologyBuilder.sourceTopicPattern();
     }
 
-    private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
-        for (final String node : allNodesOfGroups) {
-            if (isGlobalSource(node)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private void describeSubtopology(final TopologyDescription description,
-                                     final Integer subtopologyId,
-                                     final Set<String> nodeNames) {
-
-        final HashMap<String, TopologyDescription.AbstractNode> nodesByName = new HashMap<>();
-
-        // add all nodes
-        for (final String nodeName : nodeNames) {
-            nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
-        }
-
-        // connect each node to its predecessors and successors
-        for (final TopologyDescription.AbstractNode node : nodesByName.values()) {
-            for (final String predecessorName : nodeFactories.get(node.name()).parents) {
-                final TopologyDescription.AbstractNode predecessor = nodesByName.get(predecessorName);
-                node.addPredecessor(predecessor);
-                predecessor.addSuccessor(node);
-            }
-        }
-
-        description.addSubtopology(new TopologyDescription.Subtopology(
-            subtopologyId,
-            new HashSet<TopologyDescription.Node>(nodesByName.values())));
-    }
-
-    private void describeGlobalStores(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
-            final Set<String> nodes = nodeGroup.getValue();
-
-            final Iterator<String> it = nodes.iterator();
-            while (it.hasNext()) {
-                final String node = it.next();
-
-                if (isGlobalSource(node)) {
-                    // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
-                    it.remove(); // remove sourceNode from group
-                    final String processorNode = nodes.iterator().next(); // get remaining processorNode
-
-                    description.addGlobalStore(new TopologyDescription.GlobalStore(
-                        node,
-                        processorNode,
-                        ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
-                        nodeToSourceTopics.get(node).get(0)
-                    ));
-                    break;
-                }
-            }
-        }
+    /**
+     * NOTE this function would not needed by developers working with the processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
+    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
+                                                 final String threadId) {
+        internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);
     }
 
 }


Mime
View raw message