kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
Date Mon, 24 Jul 2017 18:03:32 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
deleted file mode 100644
index 0949bf5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * A meta representation of a {@link Topology topology}.
- * <p>
- * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
- * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
- * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
- * {@link Topology#addSource(String, String...) reads} from the same topic.
- * <p>
- * For {@link KafkaStreams#start() execution} sub-topologies are translated into {@link StreamTask tasks}.
- */
-// TODO make public (hide until KIP-120 if fully implemented)
-final class TopologyDescription {
-    private final Set<Subtopology> subtopologies = new HashSet<>();
-    private final Set<GlobalStore> globalStores = new HashSet<>();
-
-    /**
-     * A connected sub-graph of a {@link Topology}.
-     * <p>
-     * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
-     * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
-     * (i.e., if multiple processors share the same state).
-     */
-    public final static class Subtopology {
-        private final int id;
-        private final Set<Node> nodes;
-
-        Subtopology(final int id,
-                    final Set<Node> nodes) {
-            this.id = id;
-            this.nodes = nodes;
-        }
-
-        /**
-         * Internally assigned unique ID.
-         * @return the ID of the sub-topology
-         */
-        public int id() {
-            return id;
-        }
-
-        /**
-         * All nodes of this sub-topology.
-         * @return set of all nodes within the sub-topology
-         */
-        public Set<Node> nodes() {
-            return Collections.unmodifiableSet(nodes);
-        }
-
-        @Override
-        public String toString() {
-            return "Sub-topology: " + id + "\n" + nodesAsString();
-        }
-
-        private String nodesAsString() {
-            final StringBuilder sb = new StringBuilder();
-            for (final Node node : nodes) {
-                sb.append("    ");
-                sb.append(node);
-                sb.append('\n');
-            }
-            return sb.toString();
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            final Subtopology that = (Subtopology) o;
-            return id == that.id
-                && nodes.equals(that.nodes);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(id, nodes);
-        }
-    }
-
-    /**
-     * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
-     * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
-     * String, ProcessorSupplier)} global store}.
-     * Adding a global store results in adding a source node and one stateful processor node.
-     * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
-     * global stores are not connected to each other.
-     * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
-     * stores will never be part of any {@link Subtopology}.
-     */
-    public final static class GlobalStore {
-        private final Source source;
-        private final Processor processor;
-
-        GlobalStore(final String sourceName,
-                    final String processorName,
-                    final String storeName,
-                    final String topicName) {
-            source = new Source(sourceName, topicName);
-            processor = new Processor(processorName, Collections.singleton(storeName));
-            source.successors.add(processor);
-            processor.predecessors.add(source);
-        }
-
-        /**
-         * The source node reading from a "global" topic.
-         * @return the "global" source node
-         */
-        public Source source() {
-            return source;
-        }
-
-        /**
-         * The processor node maintaining the global store.
-         * @return the "global" processor node
-         */
-        public Processor processor() {
-            return processor;
-        }
-
-        @Override
-        public String toString() {
-            return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
-                + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            final GlobalStore that = (GlobalStore) o;
-            return source.equals(that.source)
-                && processor.equals(that.processor);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(source, processor);
-        }
-    }
-
-    /**
-     * A node of a topology. Can be a source, sink, or processor node.
-     */
-    public interface Node {
-        /**
-         * The name of the node. Will never be {@code null}.
-         * @return the name of the node
-         */
-        String name();
-        /**
-         * The predecessors of this node within a sub-topology.
-         * Note, sources do not have any predecessors.
-         * Will never be {@code null}.
-         * @return set of all predecessors
-         */
-        Set<Node> predecessors();
-        /**
-         * The successor of this node within a sub-topology.
-         * Note, sinks do not have any successors.
-         * Will never be {@code null}.
-         * @return set of all successor
-         */
-        Set<Node> successors();
-    }
-
-    abstract static class AbstractNode implements Node {
-        final String name;
-        final Set<Node> predecessors = new HashSet<>();
-        final Set<Node> successors = new HashSet<>();
-
-        AbstractNode(final String name) {
-            this.name = name;
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public Set<Node> predecessors() {
-            return Collections.unmodifiableSet(predecessors);
-        }
-
-        @Override
-        public Set<Node> successors() {
-            return Collections.unmodifiableSet(successors);
-        }
-
-        void addPredecessor(final Node predecessor) {
-            predecessors.add(predecessor);
-        }
-
-        void addSuccessor(final Node successor) {
-            successors.add(successor);
-        }
-    }
-
-    /**
-     * A source node of a topology.
-     */
-    public final static class Source extends AbstractNode {
-        private final String topics;
-
-        Source(final String name,
-               final String topics) {
-            super(name);
-            this.topics = topics;
-        }
-
-        /**
-         * The topic names this source node is reading from.
-         * @return comma separated list of topic names or pattern (as String)
-         */
-        public String topics() {
-            return topics;
-        }
-
-        @Override
-        void addPredecessor(final Node predecessor) {
-            throw new UnsupportedOperationException("Sources don't have predecessors.");
-        }
-
-        @Override
-        public String toString() {
-            return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            final Source source = (Source) o;
-            // omit successor to avoid infinite loops
-            return name.equals(source.name)
-                && topics.equals(source.topics);
-        }
-
-        @Override
-        public int hashCode() {
-            // omit successor as it might change and alter the hash code
-            return Objects.hash(name, topics);
-        }
-    }
-
-    /**
-     * A processor node of a topology.
-     */
-    public final static class Processor extends AbstractNode {
-        private final Set<String> stores;
-
-        Processor(final String name,
-                  final Set<String> stores) {
-            super(name);
-            this.stores = stores;
-        }
-
-        /**
-         * The names of all connected stores.
-         * @return set of store names
-         */
-        public Set<String> stores() {
-            return Collections.unmodifiableSet(stores);
-        }
-
-        @Override
-        public String toString() {
-            return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            final Processor processor = (Processor) o;
-            // omit successor to avoid infinite loops
-            return name.equals(processor.name)
-                && stores.equals(processor.stores)
-                && predecessors.equals(processor.predecessors);
-        }
-
-        @Override
-        public int hashCode() {
-            // omit successor as it might change and alter the hash code
-            return Objects.hash(name, stores);
-        }
-    }
-
-    /**
-     * A sink node of a topology.
-     */
-    public final static class Sink extends AbstractNode {
-        private final String topic;
-
-        Sink(final String name,
-             final String topic) {
-            super(name);
-            this.topic = topic;
-        }
-
-        /**
-         * The topic name this sink node is writing to.
-         * @return a topic name
-         */
-        public String topic() {
-            return topic;
-        }
-
-        @Override
-        void addSuccessor(final Node successor) {
-            throw new UnsupportedOperationException("Sinks don't have successors.");
-        }
-
-        @Override
-        public String toString() {
-            return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            final Sink sink = (Sink) o;
-            return name.equals(sink.name)
-                && topic.equals(sink.topic)
-                && predecessors.equals(sink.predecessors);
-        }
-
-        @Override
-        public int hashCode() {
-            // omit predecessors as it might change and alter the hash code
-            return Objects.hash(name, topic);
-        }
-    }
-
-    void addSubtopology(final Subtopology subtopology) {
-        subtopologies.add(subtopology);
-    }
-
-    void addGlobalStore(final GlobalStore globalStore) {
-        globalStores.add(globalStore);
-    }
-
-    /**
-     * All sub-topologies of the represented topology.
-     * @return set of all sub-topologies
-     */
-    public Set<Subtopology> subtopologies() {
-        return Collections.unmodifiableSet(subtopologies);
-    }
-
-    /**
-     * All global stores of the represented topology.
-     * @return set of all global stores
-     */
-    public Set<GlobalStore> globalStores() {
-        return Collections.unmodifiableSet(globalStores);
-    }
-
-    @Override
-    public String toString() {
-        return subtopologiesAsString() + globalStoresAsString();
-    }
-
-    private static String nodeNames(final Set<Node> nodes) {
-        final StringBuilder sb = new StringBuilder();
-        if (!nodes.isEmpty()) {
-            for (final Node n : nodes) {
-                sb.append(n.name());
-                sb.append(", ");
-            }
-            sb.deleteCharAt(sb.length() - 1);
-            sb.deleteCharAt(sb.length() - 1);
-        }
-        return sb.toString();
-    }
-
-    private String subtopologiesAsString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("Sub-topologies: \n");
-        if (subtopologies.isEmpty()) {
-            sb.append("  none\n");
-        } else {
-            for (final Subtopology st : subtopologies) {
-                sb.append("  ");
-                sb.append(st);
-            }
-        }
-        return sb.toString();
-    }
-
-    private String globalStoresAsString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("Global Stores:\n");
-        if (globalStores.isEmpty()) {
-            sb.append("  none\n");
-        } else {
-            for (final GlobalStore gs : globalStores) {
-                sb.append("  ");
-                sb.append(gs);
-            }
-        }
-        return sb.toString();
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        final TopologyDescription that = (TopologyDescription) o;
-        return subtopologies.equals(that.subtopologies)
-            && globalStores.equals(that.globalStores);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(subtopologies, globalStores);
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
new file mode 100644
index 0000000..ff65d31
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -0,0 +1,1491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+
+public class InternalTopologyBuilder {
+
+    private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
+
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+    private static final String[] NO_PREDECESSORS = {};
+
+    // node factories in a topological order
+    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
+
+    // state factories
+    private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+
+    // global state factories
+    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>();
+
+    // all topics subscribed from source processors (without application-id prefix for internal topics)
+    private final Set<String> sourceTopicNames = new HashSet<>();
+
+    // all internal topics auto-created by the topology builder and used in source / sink processors
+    private final Set<String> internalTopicNames = new HashSet<>();
+
+    // groups of source processors that need to be copartitioned
+    private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
+
+    // map from source processor names to subscribed topics (without application-id prefix for internal topics)
+    private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();
+
+    // map from source processor names to regex subscription patterns
+    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
+
+    // map from sink processor names to subscribed topic (without application-id prefix for internal topics)
+    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+
+    // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
+    // even if it can be matched by multiple regex patterns
+    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+
+    // map from state store names to all the topics subscribed from source processors that
+    // are connected to these state stores
+    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
+
+    // map from state store names to all the regex subscribed topics from source processors that
+    // are connected to these state stores
+    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>();
+
+    // map from state store names to this state store's corresponding changelog topic if possible,
+    // this is used in the extended KStreamBuilder.
+    private final Map<String, String> storeToChangelogTopic = new HashMap<>();
+
+    // all global topics
+    private final Set<String> globalTopics = new HashSet<>();
+
+    private final Set<String> earliestResetTopics = new HashSet<>();
+
+    private final Set<String> latestResetTopics = new HashSet<>();
+
+    private final Set<Pattern> earliestResetPatterns = new HashSet<>();
+
+    private final Set<Pattern> latestResetPatterns = new HashSet<>();
+
+    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
+
+    private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+
+    private String applicationId = null;
+
+    private Pattern topicPattern = null;
+
+    private Map<Integer, Set<String>> nodeGroups = null;
+
+    private static class StateStoreFactory {
+        public final Set<String> users;
+
+        public final StateStoreSupplier supplier;
+
+        StateStoreFactory(final StateStoreSupplier supplier) {
+            this.supplier = supplier;
+            users = new HashSet<>();
+        }
+    }
+
+    private static abstract class NodeFactory {
+        final String name;
+        final String[] predecessors;
+
+        NodeFactory(final String name,
+                    final String[] predecessors) {
+            this.name = name;
+            this.predecessors = predecessors;
+        }
+
+        public abstract ProcessorNode build();
+
+        abstract AbstractNode describe();
+    }
+
+    private static class ProcessorNodeFactory extends NodeFactory {
+        private final ProcessorSupplier<?, ?> supplier;
+        private final Set<String> stateStoreNames = new HashSet<>();
+
+        ProcessorNodeFactory(final String name,
+                             final String[] predecessors,
+                             final ProcessorSupplier<?, ?> supplier) {
+            super(name, predecessors.clone());
+            this.supplier = supplier;
+        }
+
+        public void addStateStore(final String stateStoreName) {
+            stateStoreNames.add(stateStoreName);
+        }
+
+        @Override
+        public ProcessorNode build() {
+            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
+        }
+
+        @Override
+        Processor describe() {
+            return new Processor(name, new HashSet<>(stateStoreNames));
+        }
+    }
+
+    private class SourceNodeFactory extends NodeFactory {
+        private final List<String> topics;
+        private final Pattern pattern;
+        private final Deserializer<?> keyDeserializer;
+        private final Deserializer<?> valDeserializer;
+        private final TimestampExtractor timestampExtractor;
+
+        private SourceNodeFactory(final String name,
+                                  final String[] topics,
+                                  final Pattern pattern,
+                                  final TimestampExtractor timestampExtractor,
+                                  final Deserializer<?> keyDeserializer,
+                                  final Deserializer<?> valDeserializer) {
+            super(name, NO_PREDECESSORS);
+            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
+            this.pattern = pattern;
+            this.keyDeserializer = keyDeserializer;
+            this.valDeserializer = valDeserializer;
+            this.timestampExtractor = timestampExtractor;
+        }
+
+        List<String> getTopics(final Collection<String> subscribedTopics) {
+            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
+            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
+            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
+            if (subscribedTopics.isEmpty()) {
+                return Collections.singletonList("" + pattern + "");
+            }
+
+            final List<String> matchedTopics = new ArrayList<>();
+            for (final String update : subscribedTopics) {
+                if (pattern == topicToPatterns.get(update)) {
+                    matchedTopics.add(update);
+                } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
+                    // the same topic cannot be matched to more than one pattern
+                    // TODO: we should lift this requirement in the future
+                    throw new TopologyBuilderException("Topic " + update +
+                        " is already matched for another regex pattern " + topicToPatterns.get(update) +
+                        " and hence cannot be matched to this regex pattern " + pattern + " any more.");
+                } else if (isMatch(update)) {
+                    topicToPatterns.put(update, pattern);
+                    matchedTopics.add(update);
+                }
+            }
+            return matchedTopics;
+        }
+
+        @Override
+        public ProcessorNode build() {
+            final List<String> sourceTopics = nodeToSourceTopics.get(name);
+
+            // if it is subscribed via patterns, it is possible that the topic metadata has not been updated
+            // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
+            // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
+            if (sourceTopics == null) {
+                return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
+            } else {
+                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
+            }
+        }
+
+        private boolean isMatch(final String topic) {
+            return pattern.matcher(topic).matches();
+        }
+
+        @Override
+        Source describe() {
+            String sourceTopics;
+
+            if (pattern == null) {
+                sourceTopics = topics.toString();
+                sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
+            } else {
+                sourceTopics = pattern.toString();
+            }
+
+            return new Source(name, sourceTopics);
+        }
+    }
+
+    private class SinkNodeFactory<K, V> extends NodeFactory {
+        private final String topic;
+        private final Serializer<K> keySerializer;
+        private final Serializer<V> valSerializer;
+        private final StreamPartitioner<? super K, ? super V> partitioner;
+
+        private SinkNodeFactory(final String name,
+                                final String[] predecessors,
+                                final String topic,
+                                final Serializer<K> keySerializer,
+                                final Serializer<V> valSerializer,
+                                final StreamPartitioner<? super K, ? super V> partitioner) {
+            super(name, predecessors.clone());
+            this.topic = topic;
+            this.keySerializer = keySerializer;
+            this.valSerializer = valSerializer;
+            this.partitioner = partitioner;
+        }
+
+        @Override
+        public ProcessorNode build() {
+            if (internalTopicNames.contains(topic)) {
+                // prefix the internal topic name with the application id
+                return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
+            } else {
+                return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
+            }
+        }
+
+        @Override
+        Sink describe() {
+            return new Sink(name, topic);
+        }
+    }
+
+    public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) {
+        Objects.requireNonNull(applicationId, "applicationId can't be null");
+        this.applicationId = applicationId;
+
+        return this;
+    }
+
+    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+                                final String name,
+                                final TimestampExtractor timestampExtractor,
+                                final Deserializer keyDeserializer,
+                                final Deserializer valDeserializer,
+                                final String... topics) {
+        if (topics.length == 0) {
+            throw new TopologyBuilderException("You must provide at least one topic");
+        }
+        Objects.requireNonNull(name, "name must not be null");
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
+        }
+
+        for (final String topic : topics) {
+            Objects.requireNonNull(topic, "topic names cannot be null");
+            validateTopicNotAlreadyRegistered(topic);
+            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
+            sourceTopicNames.add(topic);
+        }
+
+        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
+        nodeToSourceTopics.put(name, Arrays.asList(topics));
+        nodeGrouper.add(name);
+    }
+
+    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                     final String sourceName,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Deserializer keyDeserializer,
+                                     final Deserializer valueDeserializer,
+                                     final String topic,
+                                     final String processorName,
+                                     final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+        Objects.requireNonNull(sourceName, "sourceName must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+        Objects.requireNonNull(processorName, "processorName must not be null");
+        if (nodeFactories.containsKey(sourceName)) {
+            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
+        }
+        if (nodeFactories.containsKey(processorName)) {
+            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
+        }
+        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
+        }
+        if (storeSupplier.loggingEnabled()) {
+            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+        }
+        if (sourceName.equals(processorName)) {
+            throw new TopologyBuilderException("sourceName and processorName must be different.");
+        }
+
+        validateTopicNotAlreadyRegistered(topic);
+
+        globalTopics.add(topic);
+        final String[] topics = {topic};
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+        nodeGrouper.add(sourceName);
+
+        final String[] predecessors = {sourceName};
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+        nodeFactory.addStateStore(storeSupplier.name());
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, predecessors);
+
+        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+    }
+
+    private void validateTopicNotAlreadyRegistered(final String topic) {
+        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
+        }
+
+        for (final Pattern pattern : nodeToSourcePatterns.values()) {
+            if (pattern.matcher(topic).matches()) {
+                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+            }
+        }
+    }
+
+    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+                                final String name,
+                                final TimestampExtractor timestampExtractor,
+                                final Deserializer keyDeserializer,
+                                final Deserializer valDeserializer,
+                                final Pattern topicPattern) {
+        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+        Objects.requireNonNull(name, "name can't be null");
+
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
+        }
+
+        for (final String sourceTopicName : sourceTopicNames) {
+            if (topicPattern.matcher(sourceTopicName).matches()) {
+                throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
+            }
+        }
+
+        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
+
+        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
+        nodeToSourcePatterns.put(name, topicPattern);
+        nodeGrouper.add(name);
+    }
+
+    public final <K, V> void addSink(final String name,
+                                     final String topic,
+                                     final Serializer<K> keySerializer,
+                                     final Serializer<V> valSerializer,
+                                     final StreamPartitioner<? super K, ? super V> partitioner,
+                                     final String... predecessorNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
+        }
+
+        for (final String predecessor : predecessorNames) {
+            if (predecessor.equals(name)) {
+                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+            }
+            if (!nodeFactories.containsKey(predecessor)) {
+                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+            }
+        }
+
+        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner));
+        nodeToSinkTopic.put(name, topic);
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, predecessorNames);
+    }
+
+    public final void addProcessor(final String name,
+                                   final ProcessorSupplier supplier,
+                                   final String... predecessorNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(supplier, "supplier must not be null");
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
+        }
+
+        for (final String predecessor : predecessorNames) {
+            if (predecessor.equals(name)) {
+                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+            }
+            if (!nodeFactories.containsKey(predecessor)) {
+                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+            }
+        }
+
+        nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier));
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, predecessorNames);
+    }
+
+    public final void addStateStore(final StateStoreSupplier supplier,
+                                    final String... processorNames) {
+        Objects.requireNonNull(supplier, "supplier can't be null");
+        if (stateFactories.containsKey(supplier.name())) {
+            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+        }
+
+        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
+
+        if (processorNames != null) {
+            for (final String processorName : processorNames) {
+                connectProcessorAndStateStore(processorName, supplier.name());
+            }
+        }
+    }
+
+    public final void connectProcessorAndStateStores(final String processorName,
+                                                     final String... stateStoreNames) {
+        Objects.requireNonNull(processorName, "processorName can't be null");
+        if (stateStoreNames != null) {
+            for (final String stateStoreName : stateStoreNames) {
+                connectProcessorAndStateStore(processorName, stateStoreName);
+            }
+        }
+    }
+
+    public final void connectSourceStoreAndTopic(final String sourceStoreName,
+                                                  final String topic) {
+        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
+            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+        }
+        storeToChangelogTopic.put(sourceStoreName, topic);
+    }
+
+    public final void connectProcessors(final String... processorNames) {
+        if (processorNames.length < 2) {
+            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+        }
+
+        for (final String processorName : processorNames) {
+            if (!nodeFactories.containsKey(processorName)) {
+                throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+            }
+        }
+
+        nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
+    }
+
+    public final void addInternalTopic(final String topicName) {
+        Objects.requireNonNull(topicName, "topicName can't be null");
+        internalTopicNames.add(topicName);
+    }
+
+    public final void copartitionSources(final Collection<String> sourceNodes) {
+        copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+    }
+
+    private void connectProcessorAndStateStore(final String processorName,
+                                               final String stateStoreName) {
+        if (!stateFactories.containsKey(stateStoreName)) {
+            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+        }
+        if (!nodeFactories.containsKey(processorName)) {
+            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+        }
+
+        final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+        final Iterator<String> iter = stateStoreFactory.users.iterator();
+        if (iter.hasNext()) {
+            final String user = iter.next();
+            nodeGrouper.unite(user, processorName);
+        }
+        stateStoreFactory.users.add(processorName);
+
+        final NodeFactory nodeFactory = nodeFactories.get(processorName);
+        if (nodeFactory instanceof ProcessorNodeFactory) {
+            final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+            processorNodeFactory.addStateStore(stateStoreName);
+            connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
+        } else {
+            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+        }
+    }
+
+    private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(final String[] predecessors) {
+        final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
+        for (final String predecessor : predecessors) {
+            final NodeFactory nodeFactory = nodeFactories.get(predecessor);
+            if (nodeFactory instanceof SourceNodeFactory) {
+                sourceNodes.add((SourceNodeFactory) nodeFactory);
+            } else if (nodeFactory instanceof ProcessorNodeFactory) {
+                sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors));
+            }
+        }
+        return sourceNodes;
+    }
+
+    private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+                                                              final ProcessorNodeFactory processorNodeFactory) {
+        // we should never update the mapping from state store names to source topics if the store name already exists
+        // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
+        // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
+
+        if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
+            return;
+        }
+
+        final Set<String> sourceTopics = new HashSet<>();
+        final Set<Pattern> sourcePatterns = new HashSet<>();
+        final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
+
+        for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
+            if (sourceNodeFactory.pattern != null) {
+                sourcePatterns.add(sourceNodeFactory.pattern);
+            } else {
+                sourceTopics.addAll(sourceNodeFactory.topics);
+            }
+        }
+
+        if (!sourceTopics.isEmpty()) {
+            stateStoreNameToSourceTopics.put(stateStoreName,
+                    Collections.unmodifiableSet(sourceTopics));
+        }
+
+        if (!sourcePatterns.isEmpty()) {
+            stateStoreNameToSourceRegex.put(stateStoreName,
+                    Collections.unmodifiableSet(sourcePatterns));
+        }
+
+    }
+
+    private <T> void maybeAddToResetList(final Collection<T> earliestResets,
+                                         final Collection<T> latestResets,
+                                         final TopologyBuilder.AutoOffsetReset offsetReset,
+                                         final T item) {
+        if (offsetReset != null) {
+            switch (offsetReset) {
+                case EARLIEST:
+                    earliestResets.add(item);
+                    break;
+                case LATEST:
+                    latestResets.add(item);
+                    break;
+                default:
+                    throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+            }
+        }
+    }
+
+    public synchronized Map<Integer, Set<String>> nodeGroups() {
+        if (nodeGroups == null) {
+            nodeGroups = makeNodeGroups();
+        }
+        return nodeGroups;
+    }
+
+    private Map<Integer, Set<String>> makeNodeGroups() {
+        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+
+        int nodeGroupId = 0;
+
+        // Go through source nodes first. This makes the group id assignment easy to predict in tests
+        final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
+        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
+
+        for (final String nodeName : Utils.sorted(allSourceNodes)) {
+            final String root = nodeGrouper.root(nodeName);
+            Set<String> nodeGroup = rootToNodeGroup.get(root);
+            if (nodeGroup == null) {
+                nodeGroup = new HashSet<>();
+                rootToNodeGroup.put(root, nodeGroup);
+                nodeGroups.put(nodeGroupId++, nodeGroup);
+            }
+            nodeGroup.add(nodeName);
+        }
+
+        // Go through non-source nodes
+        for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
+            if (!nodeToSourceTopics.containsKey(nodeName)) {
+                final String root = nodeGrouper.root(nodeName);
+                Set<String> nodeGroup = rootToNodeGroup.get(root);
+                if (nodeGroup == null) {
+                    nodeGroup = new HashSet<>();
+                    rootToNodeGroup.put(root, nodeGroup);
+                    nodeGroups.put(nodeGroupId++, nodeGroup);
+                }
+                nodeGroup.add(nodeName);
+            }
+        }
+
+        return nodeGroups;
+    }
+
+    public synchronized ProcessorTopology build(final Integer topicGroupId) {
+        final Set<String> nodeGroup;
+        if (topicGroupId != null) {
+            nodeGroup = nodeGroups().get(topicGroupId);
+        } else {
+            // when topicGroupId is null, we build the full topology minus the global groups
+            final Set<String> globalNodeGroups = globalNodeGroups();
+            final Collection<Set<String>> values = nodeGroups().values();
+            nodeGroup = new HashSet<>();
+            for (final Set<String> value : values) {
+                nodeGroup.addAll(value);
+            }
+            nodeGroup.removeAll(globalNodeGroups);
+
+
+        }
+        return build(nodeGroup);
+    }
+
+    /**
+     * Builds the topology for any global state stores
+     * @return ProcessorTopology
+     */
+    public synchronized ProcessorTopology buildGlobalStateTopology() {
+        final Set<String> globalGroups = globalNodeGroups();
+        if (globalGroups.isEmpty()) {
+            return null;
+        }
+        return build(globalGroups);
+    }
+
+    private Set<String> globalNodeGroups() {
+        final Set<String> globalGroups = new HashSet<>();
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
+            final Set<String> nodes = nodeGroup.getValue();
+            for (final String node : nodes) {
+                if (isGlobalSource(node)) {
+                    globalGroups.addAll(nodes);
+                }
+            }
+        }
+        return globalGroups;
+    }
+
+    private ProcessorTopology build(final Set<String> nodeGroup) {
+        final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+        final Map<String, ProcessorNode> processorMap = new HashMap<>();
+        final Map<String, SourceNode> topicSourceMap = new HashMap<>();
+        final Map<String, SinkNode> topicSinkMap = new HashMap<>();
+        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+
+        // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+        for (final NodeFactory factory : nodeFactories.values()) {
+            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
+                final ProcessorNode node = factory.build();
+                processorNodes.add(node);
+                processorMap.put(node.name(), node);
+
+                if (factory instanceof ProcessorNodeFactory) {
+                    for (final String predecessor : ((ProcessorNodeFactory) factory).predecessors) {
+                        final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor);
+                        predecessorNode.addChild(node);
+                    }
+                    for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
+                        if (!stateStoreMap.containsKey(stateStoreName)) {
+                            final StateStore stateStore;
+
+                            if (stateFactories.containsKey(stateStoreName)) {
+                                final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
+                                stateStore = supplier.get();
+
+                                // remember the changelog topic if this state store is change-logging enabled
+                                if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
+                                    final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
+                                    storeToChangelogTopic.put(stateStoreName, changelogTopic);
+                                }
+                            } else {
+                                stateStore = globalStateStores.get(stateStoreName);
+                            }
+
+                            stateStoreMap.put(stateStoreName, stateStore);
+                        }
+                    }
+                } else if (factory instanceof SourceNodeFactory) {
+                    final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
+                    final List<String> topics = (sourceNodeFactory.pattern != null) ?
+                            sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
+                            sourceNodeFactory.topics;
+
+                    for (final String topic : topics) {
+                        if (internalTopicNames.contains(topic)) {
+                            // prefix the internal topic name with the application id
+                            topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
+                        } else {
+                            topicSourceMap.put(topic, (SourceNode) node);
+                        }
+                    }
+                } else if (factory instanceof SinkNodeFactory) {
+                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory;
+
+                    for (final String predecessor : sinkNodeFactory.predecessors) {
+                        processorMap.get(predecessor).addChild(node);
+                        if (internalTopicNames.contains(sinkNodeFactory.topic)) {
+                            // prefix the internal topic name with the application id
+                            topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
+                        } else {
+                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
+                        }
+                    }
+                } else {
+                    throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+                }
+            }
+        }
+
+        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
+    }
+
+    /**
+     * Get any global {@link StateStore}s that are part of the
+     * topology
+     * @return map containing all global {@link StateStore}s
+     */
+    public Map<String, StateStore> globalStateStores() {
+        return Collections.unmodifiableMap(globalStateStores);
+    }
+
+    /**
+     * Returns the map of topic groups keyed by the group id.
+     * A topic group is a group of topics in the same task.
+     *
+     * @return groups of topic names
+     */
+    public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
+        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+
+        if (nodeGroups == null) {
+            nodeGroups = makeNodeGroups();
+        }
+
+        for (final Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
+            final Set<String> sinkTopics = new HashSet<>();
+            final Set<String> sourceTopics = new HashSet<>();
+            final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+            final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
+            for (final String node : entry.getValue()) {
+                // if the node is a source node, add to the source topics
+                final List<String> topics = nodeToSourceTopics.get(node);
+                if (topics != null) {
+                    // if some of the topics are internal, add them to the internal topics
+                    for (final String topic : topics) {
+                        // skip global topic as they don't need partition assignment
+                        if (globalTopics.contains(topic)) {
+                            continue;
+                        }
+                        if (internalTopicNames.contains(topic)) {
+                            // prefix the internal topic name with the application id
+                            final String internalTopic = decorateTopic(topic);
+                            internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
+                                                                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                                            Collections.<String, String>emptyMap()));
+                            sourceTopics.add(internalTopic);
+                        } else {
+                            sourceTopics.add(topic);
+                        }
+                    }
+                }
+
+                // if the node is a sink node, add to the sink topics
+                final String topic = nodeToSinkTopic.get(node);
+                if (topic != null) {
+                    if (internalTopicNames.contains(topic)) {
+                        // prefix the change log topic name with the application id
+                        sinkTopics.add(decorateTopic(topic));
+                    } else {
+                        sinkTopics.add(topic);
+                    }
+                }
+
+                // if the node is connected to a state, add to the state topics
+                for (final StateStoreFactory stateFactory : stateFactories.values()) {
+                    final StateStoreSupplier supplier = stateFactory.supplier;
+                    if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
+                        final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
+                        final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
+                        stateChangelogTopics.put(name, internalTopicConfig);
+                    }
+                }
+            }
+            if (!sourceTopics.isEmpty()) {
+                topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+                        Collections.unmodifiableSet(sinkTopics),
+                        Collections.unmodifiableSet(sourceTopics),
+                        Collections.unmodifiableMap(internalSourceTopics),
+                        Collections.unmodifiableMap(stateChangelogTopics)));
+            }
+        }
+
+        return Collections.unmodifiableMap(topicGroups);
+    }
+
+    private void setRegexMatchedTopicsToSourceNodes() {
+        if (subscriptionUpdates.hasUpdates()) {
+            for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+                final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                //need to update nodeToSourceTopics with topics matched from given regex
+                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+                log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
+            }
+        }
+    }
+
+    private void setRegexMatchedTopicToStateStore() {
+        if (subscriptionUpdates.hasUpdates()) {
+            for (final Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) {
+                final Set<String> updatedTopicsForStateStore = new HashSet<>();
+                for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
+                    for (final Pattern pattern : storePattern.getValue()) {
+                        if (pattern.matcher(subscriptionUpdateTopic).matches()) {
+                            updatedTopicsForStateStore.add(subscriptionUpdateTopic);
+                        }
+                    }
+                }
+                if (!updatedTopicsForStateStore.isEmpty()) {
+                    final Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
+                    if (storeTopics != null) {
+                        updatedTopicsForStateStore.addAll(storeTopics);
+                    }
+                    stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+                }
+            }
+        }
+    }
+    
+    private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier,
+                                                          final String name) {
+        if (!(supplier instanceof WindowStoreSupplier)) {
+            return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
+        }
+
+        final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
+        final InternalTopicConfig config = new InternalTopicConfig(name,
+                                                                   Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                                                           InternalTopicConfig.CleanupPolicy.delete),
+                                                                   supplier.logConfig());
+        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
+        return config;
+    }
+
+    public synchronized Pattern earliestResetTopicsPattern() {
+        final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
+        final Pattern earliestPattern =  buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
+
+        ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
+
+        return earliestPattern;
+    }
+
+    public synchronized Pattern latestResetTopicsPattern() {
+        final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics);
+        final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns);
+
+        ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics);
+
+        return  latestPattern;
+    }
+
+    private void ensureNoRegexOverlap(final Pattern builtPattern,
+                                      final Set<Pattern> otherPatterns,
+                                      final Set<String> otherTopics) {
+        for (final Pattern otherPattern : otherPatterns) {
+            if (builtPattern.pattern().contains(otherPattern.pattern())) {
+                throw new TopologyBuilderException(
+                    String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
+                        otherPattern.pattern(),
+                        builtPattern.pattern()));
+            }
+        }
+
+        for (final String otherTopic : otherTopics) {
+            if (builtPattern.matcher(otherTopic).matches()) {
+                throw new TopologyBuilderException(
+                    String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
+                        builtPattern.pattern(),
+                        otherTopic));
+            }
+        }
+    }
+
+    private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics,
+                                                            final Collection<Pattern> sourcePatterns) {
+        final StringBuilder builder = new StringBuilder();
+
+        for (final String topic : sourceTopics) {
+            builder.append(topic).append("|");
+        }
+
+        for (final Pattern sourcePattern : sourcePatterns) {
+            builder.append(sourcePattern.pattern()).append("|");
+        }
+
+        if (builder.length() > 0) {
+            builder.setLength(builder.length() - 1);
+            return Pattern.compile(builder.toString());
+        }
+
+        return EMPTY_ZERO_LENGTH_PATTERN;
+    }
+
+    public Map<String, List<String>> stateStoreNameToSourceTopics() {
+        final Map<String, List<String>> results = new HashMap<>();
+        for (final Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
+            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+        }
+        return results;
+    }
+
+    public synchronized Collection<Set<String>> copartitionGroups() {
+        final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+        for (final Set<String> nodeNames : copartitionSourceGroups) {
+            final Set<String> copartitionGroup = new HashSet<>();
+            for (final String node : nodeNames) {
+                final List<String> topics = nodeToSourceTopics.get(node);
+                if (topics != null) {
+                    copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
+                }
+            }
+            list.add(Collections.unmodifiableSet(copartitionGroup));
+        }
+        return Collections.unmodifiableList(list);
+    }
+
+    private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
+        final List<String> decoratedTopics = new ArrayList<>();
+        for (final String topic : sourceTopics) {
+            if (internalTopicNames.contains(topic)) {
+                decoratedTopics.add(decorateTopic(topic));
+            } else {
+                decoratedTopics.add(topic);
+            }
+        }
+        return decoratedTopics;
+    }
+
+    private String decorateTopic(final String topic) {
+        if (applicationId == null) {
+            throw new TopologyBuilderException("there are internal topics and "
+                    + "applicationId hasn't been set. Call "
+                    + "setApplicationId first");
+        }
+
+        return applicationId + "-" + topic;
+    }
+
+    public SubscriptionUpdates subscriptionUpdates() {
+        return subscriptionUpdates;
+    }
+
+    public synchronized Pattern sourceTopicPattern() {
+        if (topicPattern == null) {
+            final List<String> allSourceTopics = new ArrayList<>();
+            if (!nodeToSourceTopics.isEmpty()) {
+                for (final List<String> topics : nodeToSourceTopics.values()) {
+                    allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
+                }
+            }
+            Collections.sort(allSourceTopics);
+
+            topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values());
+        }
+
+        return topicPattern;
+    }
+
+    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
+                                                 final String threadId) {
+        log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)",
+            threadId, subscriptionUpdates);
+        this.subscriptionUpdates = subscriptionUpdates;
+        setRegexMatchedTopicsToSourceNodes();
+        setRegexMatchedTopicToStateStore();
+    }
+
+    private boolean isGlobalSource(final String nodeName) {
+        final NodeFactory nodeFactory = nodeFactories.get(nodeName);
+
+        if (nodeFactory instanceof SourceNodeFactory) {
+            final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
+            if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public TopologyDescription describe() {
+        final TopologyDescription description = new TopologyDescription();
+
+        describeSubtopologies(description);
+        describeGlobalStores(description);
+
+        return description;
+    }
+
+    private void describeSubtopologies(final TopologyDescription description) {
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+
+            final Set<String> allNodesOfGroups = nodeGroup.getValue();
+            final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
+
+            if (!isNodeGroupOfGlobalStores) {
+                describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
+            }
+        }
+    }
+
+    private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
+        for (final String node : allNodesOfGroups) {
+            if (isGlobalSource(node)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void describeSubtopology(final TopologyDescription description,
+                                     final Integer subtopologyId,
+                                     final Set<String> nodeNames) {
+
+        final HashMap<String, AbstractNode> nodesByName = new HashMap<>();
+
+        // add all nodes
+        for (final String nodeName : nodeNames) {
+            nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
+        }
+
+        // connect each node to its predecessors and successors
+        for (final AbstractNode node : nodesByName.values()) {
+            for (final String predecessorName : nodeFactories.get(node.name()).predecessors) {
+                final AbstractNode predecessor = nodesByName.get(predecessorName);
+                node.addPredecessor(predecessor);
+                predecessor.addSuccessor(node);
+            }
+        }
+
+        description.addSubtopology(new Subtopology(
+            subtopologyId,
+            new HashSet<TopologyDescription.Node>(nodesByName.values())));
+    }
+
+    private void describeGlobalStores(final TopologyDescription description) {
+        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
+            final Set<String> nodes = nodeGroup.getValue();
+
+            final Iterator<String> it = nodes.iterator();
+            while (it.hasNext()) {
+                final String node = it.next();
+
+                if (isGlobalSource(node)) {
+                    // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
+                    it.remove(); // remove sourceNode from group
+                    final String processorNode = nodes.iterator().next(); // get remaining processorNode
+
+                    description.addGlobalStore(new GlobalStore(
+                        node,
+                        processorNode,
+                        ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                        nodeToSourceTopics.get(node).get(0)
+                    ));
+                    break;
+                }
+            }
+        }
+    }
+
+    public final static class GlobalStore implements TopologyDescription.GlobalStore {
+        private final Source source;
+        private final Processor processor;
+
+        public GlobalStore(final String sourceName,
+                           final String processorName,
+                           final String storeName,
+                           final String topicName) {
+            source = new Source(sourceName, topicName);
+            processor = new Processor(processorName, Collections.singleton(storeName));
+            source.successors.add(processor);
+            processor.predecessors.add(source);
+        }
+
+        @Override
+        public TopologyDescription.Source source() {
+            return source;
+        }
+
+        @Override
+        public TopologyDescription.Processor processor() {
+            return processor;
+        }
+
+        @Override
+        public String toString() {
+            return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> "
+                + processor.name + "(store: " + processor.stores.iterator().next() + ")\n";
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final GlobalStore that = (GlobalStore) o;
+            return source.equals(that.source)
+                && processor.equals(that.processor);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(source, processor);
+        }
+    }
+
+    public abstract static class AbstractNode implements TopologyDescription.Node {
+        final String name;
+        final Set<TopologyDescription.Node> predecessors = new HashSet<>();
+        final Set<TopologyDescription.Node> successors = new HashSet<>();
+
+        AbstractNode(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public Set<TopologyDescription.Node> predecessors() {
+            return Collections.unmodifiableSet(predecessors);
+        }
+
+        @Override
+        public Set<TopologyDescription.Node> successors() {
+            return Collections.unmodifiableSet(successors);
+        }
+
+        public void addPredecessor(final TopologyDescription.Node predecessor) {
+            predecessors.add(predecessor);
+        }
+
+        public void addSuccessor(final TopologyDescription.Node successor) {
+            successors.add(successor);
+        }
+    }
+
+    public final static class Source extends AbstractNode implements TopologyDescription.Source {
+        private final String topics;
+
+        public Source(final String name,
+                      final String topics) {
+            super(name);
+            this.topics = topics;
+        }
+
+        @Override
+        public String topics() {
+            return topics;
+        }
+
+        @Override
+        public void addPredecessor(final TopologyDescription.Node predecessor) {
+            throw new UnsupportedOperationException("Sources don't have predecessors.");
+        }
+
+        @Override
+        public String toString() {
+            return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Source source = (Source) o;
+            // omit successor to avoid infinite loops
+            return name.equals(source.name)
+                && topics.equals(source.topics);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit successor as it might change and alter the hash code
+            return Objects.hash(name, topics);
+        }
+    }
+
+    public final static class Processor extends AbstractNode implements TopologyDescription.Processor {
+        private final Set<String> stores;
+
+        public Processor(final String name,
+                         final Set<String> stores) {
+            super(name);
+            this.stores = stores;
+        }
+
+        @Override
+        public Set<String> stores() {
+            return Collections.unmodifiableSet(stores);
+        }
+
+        @Override
+        public String toString() {
+            return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Processor processor = (Processor) o;
+            // omit successor to avoid infinite loops
+            return name.equals(processor.name)
+                && stores.equals(processor.stores)
+                && predecessors.equals(processor.predecessors);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit successor as it might change and alter the hash code
+            return Objects.hash(name, stores);
+        }
+    }
+
+    public final static class Sink extends AbstractNode implements TopologyDescription.Sink {
+        private final String topic;
+
+        public Sink(final String name,
+                    final String topic) {
+            super(name);
+            this.topic = topic;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public void addSuccessor(final TopologyDescription.Node successor) {
+            throw new UnsupportedOperationException("Sinks don't have successors.");
+        }
+
+        @Override
+        public String toString() {
+            return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Sink sink = (Sink) o;
+            return name.equals(sink.name)
+                && topic.equals(sink.topic)
+                && predecessors.equals(sink.predecessors);
+        }
+
+        @Override
+        public int hashCode() {
+            // omit predecessors as it might change and alter the hash code
+            return Objects.hash(name, topic);
+        }
+    }
+
+    public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology {
+        private final int id;
+        private final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes;
+
+        public Subtopology(final int id,
+                    final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes) {
+            this.id = id;
+            this.nodes = nodes;
+        }
+
+        @Override
+        public int id() {
+            return id;
+        }
+
+        @Override
+        public Set<org.apache.kafka.streams.TopologyDescription.Node> nodes() {
+            return Collections.unmodifiableSet(nodes);
+        }
+
+        @Override
+        public String toString() {
+            return "Sub-topology: " + id + "\n" + nodesAsString();
+        }
+
+        private String nodesAsString() {
+            final StringBuilder sb = new StringBuilder();
+            for (final org.apache.kafka.streams.TopologyDescription.Node node : nodes) {
+                sb.append("    ");
+                sb.append(node);
+                sb.append('\n');
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final Subtopology that = (Subtopology) o;
+            return id == that.id
+                && nodes.equals(that.nodes);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, nodes);
+        }
+    }
+
+    public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
+        private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
+        private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();
+
+        public void addSubtopology(final org.apache.kafka.streams.TopologyDescription.Subtopology subtopology) {
+            subtopologies.add(subtopology);
+        }
+
+        public void addGlobalStore(final org.apache.kafka.streams.TopologyDescription.GlobalStore globalStore) {
+            globalStores.add(globalStore);
+        }
+
+        @Override
+        public Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies() {
+            return Collections.unmodifiableSet(subtopologies);
+        }
+
+        @Override
+        public Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores() {
+            return Collections.unmodifiableSet(globalStores);
+        }
+
+        @Override
+        public String toString() {
+            return subtopologiesAsString() + globalStoresAsString();
+        }
+
+        private String subtopologiesAsString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("Sub-topologies: \n");
+            if (subtopologies.isEmpty()) {
+                sb.append("  none\n");
+            } else {
+                for (final org.apache.kafka.streams.TopologyDescription.Subtopology st : subtopologies) {
+                    sb.append("  ");
+                    sb.append(st);
+                }
+            }
+            return sb.toString();
+        }
+
+        private String globalStoresAsString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("Global Stores:\n");
+            if (globalStores.isEmpty()) {
+                sb.append("  none\n");
+            } else {
+                for (final org.apache.kafka.streams.TopologyDescription.GlobalStore gs : globalStores) {
+                    sb.append("  ");
+                    sb.append(gs);
+                }
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final TopologyDescription that = (TopologyDescription) o;
+            return subtopologies.equals(that.subtopologies)
+                && globalStores.equals(that.globalStores);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(subtopologies, globalStores);
+        }
+
+    }
+
+    private static String nodeNames(final Set<TopologyDescription.Node> nodes) {
+        final StringBuilder sb = new StringBuilder();
+        if (!nodes.isEmpty()) {
+            for (final TopologyDescription.Node n : nodes) {
+                sb.append(n.name());
+                sb.append(", ");
+            }
+            sb.deleteCharAt(sb.length() - 1);
+            sb.deleteCharAt(sb.length() - 1);
+        } else {
+            return "none";
+        }
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 91856b0..e8b6a1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -356,8 +356,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                             if (otherSinkTopics.contains(topicName)) {
                                 // if this topic is one of the sink topics of this topology,
                                 // use the maximum of all its source topic partitions as the number of partitions
-                                for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate;
+                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
+                                    final Integer numPartitionsCandidate;
                                     // It is possible the sourceTopic is another internal topic, i.e,
                                     // map().join().join(map())
                                     if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
@@ -377,10 +377,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                         }
                         // if we still have not find the right number of partitions,
                         // another iteration is needed
-                        if (numPartitions == UNKNOWN)
+                        if (numPartitions == UNKNOWN) {
                             numPartitionsNeeded = true;
-                        else
+                        } else {
                             repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eb75b14..f10bf41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,9 +34,9 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
@@ -45,7 +45,6 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
@@ -398,7 +397,7 @@ public class StreamThread extends Thread {
     public final UUID processId;
 
     protected final StreamsConfig config;
-    protected final TopologyBuilder builder;
+    protected final InternalTopologyBuilder builder;
     Producer<byte[], byte[]> threadProducer;
     private final KafkaClientSupplier clientSupplier;
     protected final Consumer<byte[], byte[]> consumer;
@@ -441,7 +440,7 @@ public class StreamThread extends Thread {
     final ConsumerRebalanceListener rebalanceListener;
     private final static int UNLIMITED_RECORDS = -1;
 
-    public StreamThread(final TopologyBuilder builder,
+    public StreamThread(final InternalTopologyBuilder builder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier,
                         final String applicationId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index bb74b48..3fd1613 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.StreamsMetadata;
 
@@ -44,14 +43,14 @@ import java.util.Set;
  */
 public class StreamsMetadataState {
     public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
-    private final TopologyBuilder builder;
+    private final InternalTopologyBuilder builder;
     private final List<StreamsMetadata> allMetadata = new ArrayList<>();
     private final Set<String> globalStores;
     private final HostInfo thisHost;
     private Cluster clusterMetadata;
     private StreamsMetadata myMetadata;
 
-    public StreamsMetadataState(final TopologyBuilder builder, final HostInfo thisHost) {
+    public StreamsMetadataState(final InternalTopologyBuilder builder, final HostInfo thisHost) {
         this.builder = builder;
         this.globalStores = builder.globalStateStores().keySet();
         this.thisHost = thisHost;


Mime
View raw message