kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Refactor TopologyBuilder with ApplicationID Prefix
Date Tue, 23 Aug 2016 04:32:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 24fd025d4 -> a2bac70a6


MINOR: Refactor TopologyBuilder with ApplicationID Prefix

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1736 from guozhangwang/Kminor-topology-applicationID


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2bac70a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2bac70a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2bac70a

Branch: refs/heads/trunk
Commit: a2bac70a6634e9a78734d23158fb7e45f290ea26
Parents: 24fd025
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Aug 22 21:33:47 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Aug 22 21:33:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   |   1 -
 .../streams/processor/TopologyBuilder.java      | 393 ++++++++++---------
 .../processor/internals/StreamThread.java       |   4 +-
 .../kstream/internals/KStreamBranchTest.java    |   1 +
 .../kstream/internals/KStreamImplTest.java      |   2 +-
 .../streams/processor/TopologyBuilderTest.java  |  29 +-
 .../internals/ProcessorTopologyTest.java        |   4 +-
 .../processor/internals/StreamThreadTest.java   |  14 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   3 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 11 files changed, 232 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 08e9842..f9544cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -92,7 +92,6 @@ public class KStreamBuilder extends TopologyBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
-
     /**
      * Create a {@link KStream} instance from the specified Pattern.
      * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7b79236..bcdb54a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -60,23 +60,45 @@ public class TopologyBuilder {
     // state factories
     private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
 
+    // all topics subscribed from source processors (without application-id prefix for internal
topics)
     private final Set<String> sourceTopicNames = new HashSet<>();
+
+    // all internal topics auto-created by the topology builder and used in source / sink
processors
     private final Set<String> internalTopicNames = new HashSet<>();
-    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
+
+    // groups of source processors that need to be copartitioned
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
+
+    // map from source processor names to subscribed topics (without application-id prefix
for internal topics)
     private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>();
+
+    // map from source processor names to regex subscription patterns
     private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
-    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+
+    // map from sink processor names to subscribed topic (without application-id prefix for
internal topics)
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+
+    // map from topics to their matched regex patterns, this is to ensure one topic is passed
through on source node
+    // even if it can be matched by multiple regex patterns
+    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+
+    // map from state store names to all the topics subscribed from source processors that
+    // are connected to these state stores
     private final Map<String, Set<String>> stateStoreNameToSourceTopics = new
HashMap<>();
+
+    // map from state store names that are directly associated with source processors to
their subscribed topics,
+    // this is used in the extended KStreamBuilder.
     private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
+
+    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
+
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-    private String applicationId;
 
-    private Map<Integer, Set<String>> nodeGroups = null;
-    private Pattern topicPattern;
+    private String applicationId = null;
 
+    private Pattern topicPattern = null;
 
+    private Map<Integer, Set<String>> nodeGroups = null;
 
     private static class StateStoreFactory {
         public final Set<String> users;
@@ -98,7 +120,7 @@ public class TopologyBuilder {
             this.name = name;
         }
 
-        public abstract ProcessorNode build(String applicationId);
+        public abstract ProcessorNode build();
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
@@ -118,7 +140,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
@@ -146,9 +168,12 @@ public class TopologyBuilder {
             for (String update : subscribedTopics) {
                 if (this.pattern == topicToPatterns.get(update)) {
                     matchedTopics.add(update);
-                    //not same pattern instance,but still matches not allowed
                 } else if (topicToPatterns.containsKey(update) && isMatch(update))
{
-                    throw new TopologyBuilderException("Topic " + update + " already matched
check for overlapping regex patterns");
+                    // the same topic cannot be matched to more than one pattern
+                    // TODO: we should lift this requirement in the future
+                    throw new TopologyBuilderException("Topic " + update +
+                            " is already matched for another regex pattern " + topicToPatterns.get(update)
+
+                            " and hence cannot be matched to this regex pattern " + pattern
+ " any more.");
                 } else if (isMatch(update)) {
                     topicToPatterns.put(update, this.pattern);
                     matchedTopics.add(update);
@@ -159,7 +184,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             return new SourceNode(name, nodeToSourceTopics.get(name), keyDeserializer, valDeserializer);
         }
 
@@ -186,10 +211,10 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             if (internalTopicNames.contains(topic)) {
                 // prefix the internal topic name with the application id
-                return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer,
partitioner);
+                return new SinkNode(name, decorateTopic(topic), keySerializer, valSerializer,
partitioner);
             } else {
                 return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
             }
@@ -232,6 +257,22 @@ public class TopologyBuilder {
     public TopologyBuilder() {}
 
     /**
+     * Set the applicationId to be used for auto-generated internal topics.
+     *
+     * This is required before calling {@link #sourceTopics}, {@link #topicGroups},
+     * {@link #copartitionSources}, {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}.
+     *
+     * @param applicationId the streams applicationId. Should be the same as set by
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
+     */
+    public synchronized final TopologyBuilder setApplicationId(String applicationId) {
+        Objects.requireNonNull(applicationId, "applicationId can't be null");
+        this.applicationId = applicationId;
+
+        return this;
+    }
+
+    /**
      * Add a new source that consumes the named topics and forward the records to child processor
and/or sink nodes.
      * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG
default key deserializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
deserializer} specified in the
@@ -342,8 +383,8 @@ public class TopologyBuilder {
             }
         }
 
-        nodeToSourcePatterns.put(name, topicPattern);
         nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer,
valDeserializer));
+        nodeToSourcePatterns.put(name, topicPattern);
         nodeGrouper.add(name);
 
         return this;
@@ -549,14 +590,10 @@ public class TopologyBuilder {
     }
 
     protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName,
String topic) {
-        if (sourceStoreToSourceTopic != null) {
-            if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
-                throw new TopologyBuilderException("Source store " + sourceStoreName + "
is already added.");
-            }
-            sourceStoreToSourceTopic.put(sourceStoreName, topic);
-        } else {
-            throw new TopologyBuilderException("sourceStoreToSourceTopic is null");
+        if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
+            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already
added.");
         }
+        sourceStoreToSourceTopic.put(sourceStoreName, topic);
 
         return this;
     }
@@ -601,6 +638,17 @@ public class TopologyBuilder {
         return this;
     }
 
+    /**
+     * Asserts that the streams of the specified source nodes must be copartitioned.
+     *
+     * @param sourceNodes a set of source node names
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
+        copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+        return this;
+    }
+
     private void connectProcessorAndStateStore(String processorName, String stateStoreName)
{
         if (!stateFactories.containsKey(stateStoreName))
             throw new TopologyBuilderException("StateStore " + stateStoreName + " is not
added yet.");
@@ -625,7 +673,6 @@ public class TopologyBuilder {
         }
     }
 
-
     private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
         final Set<String> sourceTopics = new HashSet<>();
         for (String parent : parents) {
@@ -652,85 +699,6 @@ public class TopologyBuilder {
     }
 
     /**
-     * Returns the map of topic groups keyed by the group id.
-     * A topic group is a group of topics in the same task.
-     *
-     * @return groups of topic names
-     */
-    public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
-
-
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
-                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
-                //need to update nodeToSourceTopics with topics matched from given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
-            }
-        }
-
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-
-        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            Set<String> sinkTopics = new HashSet<>();
-            Set<String> sourceTopics = new HashSet<>();
-            Set<String> internalSourceTopics = new HashSet<>();
-            Set<String> stateChangelogTopics = new HashSet<>();
-            for (String node : entry.getValue()) {
-                // if the node is a source node, add to the source topics
-                String[] topics = nodeToSourceTopics.get(node);
-                if (topics != null) {
-                    // if some of the topics are internal, add them to the internal topics
-                    for (String topic : topics) {
-                        if (this.internalTopicNames.contains(topic)) {
-                            if (applicationId == null) {
-                                throw new TopologyBuilderException("There are internal topics
and"
-                                                                   + " applicationId hasn't
been "
-                                                                   + "set. Call setApplicationId
"
-                                                                   + "first");
-                            }
-                            // prefix the internal topic name with the application id
-                            String internalTopic = applicationId + "-" + topic;
-                            internalSourceTopics.add(internalTopic);
-                            sourceTopics.add(internalTopic);
-                        } else {
-                            sourceTopics.add(topic);
-                        }
-                    }
-                }
-
-                // if the node is a sink node, add to the sink topics
-                String topic = nodeToSinkTopic.get(node);
-                if (topic != null) {
-                    if (internalTopicNames.contains(topic)) {
-                        // prefix the change log topic name with the application id
-                        sinkTopics.add(applicationId + "-" + topic);
-                    } else {
-                        sinkTopics.add(topic);
-                    }
-                }
-
-                // if the node is connected to a state, add to the state topics
-                for (StateStoreFactory stateFactory : stateFactories.values()) {
-                    if (stateFactory.isInternal && stateFactory.users.contains(node))
{
-                        // prefix the change log topic name with the application id
-                        stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name()
+ ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
-                    }
-                }
-            }
-            topicGroups.put(entry.getKey(), new TopicsInfo(
-                    Collections.unmodifiableSet(sinkTopics),
-                    Collections.unmodifiableSet(sourceTopics),
-                    Collections.unmodifiableSet(internalSourceTopics),
-                    Collections.unmodifiableSet(stateChangelogTopics)));
-        }
-
-        return Collections.unmodifiableMap(topicGroups);
-    }
-
-    /**
      * Returns the map of node groups keyed by the topic group id.
      *
      * @return groups of node names
@@ -778,62 +746,12 @@ public class TopologyBuilder {
     }
 
     /**
-     * Asserts that the streams of the specified source nodes must be copartitioned.
-     *
-     * @param sourceNodes a set of source node names
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
-        copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
-        return this;
-    }
-
-    /**
-     * Returns the copartition groups.
-     * A copartition group is a group of source topics that are required to be copartitioned.
-     *
-     * @return groups of topic names
-     */
-    public synchronized Collection<Set<String>> copartitionGroups() {
-        List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
-        for (Set<String> nodeNames : copartitionSourceGroups) {
-            Set<String> copartitionGroup = new HashSet<>();
-            for (String node : nodeNames) {
-                String[] topics = nodeToSourceTopics.get(node);
-                if (topics != null)
-                    copartitionGroup.addAll(convertInternalTopicNames(topics));
-            }
-            list.add(Collections.unmodifiableSet(copartitionGroup));
-        }
-        return Collections.unmodifiableList(list);
-    }
-
-    private List<String> convertInternalTopicNames(String...topics) {
-        final List<String> topicNames = new ArrayList<>();
-        for (String topic : topics) {
-            if (internalTopicNames.contains(topic)) {
-                if (applicationId == null) {
-                    throw new TopologyBuilderException("there are internal topics "
-                                                       + "and applicationId hasn't been set.
Call "
-                                                       + "setApplicationId first");
-                }
-                topicNames.add(applicationId + "-" + topic);
-            } else {
-                topicNames.add(topic);
-            }
-        }
-        return topicNames;
-    }
-
-
-    /**
      * Build the topology for the specified topic group. This is called automatically when
passing this builder into the
      * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)}
constructor.
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId)
{
-        Objects.requireNonNull(applicationId, "applicationId can't be null");
+    public synchronized ProcessorTopology build(Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -841,11 +759,11 @@ public class TopologyBuilder {
             // when nodeGroup is null, we build the full topology. this is used in some tests.
             nodeGroup = null;
         }
-        return build(applicationId, nodeGroup);
+        return build(nodeGroup);
     }
 
     @SuppressWarnings("unchecked")
-    private ProcessorTopology build(String applicationId, Set<String> nodeGroup) {
+    private ProcessorTopology build(Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -855,7 +773,7 @@ public class TopologyBuilder {
         // create processor nodes in a topological order ("nodeFactories" is already topologically
sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build(applicationId);
+                ProcessorNode node = factory.build();
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -870,11 +788,13 @@ public class TopologyBuilder {
                     }
                 } else if (factory instanceof SourceNodeFactory) {
                     SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
-                    String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates())
: sourceNodeFactory.getTopics();
+                    String[] topics = (sourceNodeFactory.pattern != null) ?
+                            sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates())
:
+                            sourceNodeFactory.getTopics();
                     for (String topic : topics) {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
-                            topicSourceMap.put(applicationId + "-" + topic, (SourceNode)
node);
+                            topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
                         } else {
                             topicSourceMap.put(topic, (SourceNode) node);
                         }
@@ -885,7 +805,7 @@ public class TopologyBuilder {
                         processorMap.get(parent).addChild(node);
                         if (internalTopicNames.contains(sinkNodeFactory.topic)) {
                             // prefix the internal topic name with the application id
-                            topicSinkMap.put(applicationId + "-" + sinkNodeFactory.topic,
(SinkNode) node);
+                            topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode)
node);
                         } else {
                             topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
                         }
@@ -900,6 +820,78 @@ public class TopologyBuilder {
     }
 
     /**
+     * Returns the map of topic groups keyed by the group id.
+     * A topic group is a group of topics in the same task.
+     *
+     * @return groups of topic names
+     */
+    public synchronized Map<Integer, TopicsInfo> topicGroups() {
+        Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
+
+        if (subscriptionUpdates.hasUpdates()) {
+            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
+                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                //need to update nodeToSourceTopics with topics matched from given regex
+                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+            }
+        }
+
+        if (nodeGroups == null)
+            nodeGroups = makeNodeGroups();
+
+        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
+            Set<String> sinkTopics = new HashSet<>();
+            Set<String> sourceTopics = new HashSet<>();
+            Set<String> internalSourceTopics = new HashSet<>();
+            Set<String> stateChangelogTopics = new HashSet<>();
+            for (String node : entry.getValue()) {
+                // if the node is a source node, add to the source topics
+                String[] topics = nodeToSourceTopics.get(node);
+                if (topics != null) {
+                    // if some of the topics are internal, add them to the internal topics
+                    for (String topic : topics) {
+                        if (this.internalTopicNames.contains(topic)) {
+                            // prefix the internal topic name with the application id
+                            String internalTopic = decorateTopic(topic);
+                            internalSourceTopics.add(internalTopic);
+                            sourceTopics.add(internalTopic);
+                        } else {
+                            sourceTopics.add(topic);
+                        }
+                    }
+                }
+
+                // if the node is a sink node, add to the sink topics
+                String topic = nodeToSinkTopic.get(node);
+                if (topic != null) {
+                    if (internalTopicNames.contains(topic)) {
+                        // prefix the change log topic name with the application id
+                        sinkTopics.add(decorateTopic(topic));
+                    } else {
+                        sinkTopics.add(topic);
+                    }
+                }
+
+                // if the node is connected to a state, add to the state topics
+                for (StateStoreFactory stateFactory : stateFactories.values()) {
+                    if (stateFactory.isInternal && stateFactory.users.contains(node))
{
+                        // prefix the change log topic name with the application id
+                        stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId,
stateFactory.supplier.name()));
+                    }
+                }
+            }
+            topicGroups.put(entry.getKey(), new TopicsInfo(
+                    Collections.unmodifiableSet(sinkTopics),
+                    Collections.unmodifiableSet(sourceTopics),
+                    Collections.unmodifiableSet(internalSourceTopics),
+                    Collections.unmodifiableSet(stateChangelogTopics)));
+        }
+
+        return Collections.unmodifiableMap(topicGroups);
+    }
+
+
+    /**
      * Get the names of topics that are to be consumed by the source nodes created by this
builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as
new sources are added; never null
      */
@@ -908,21 +900,62 @@ public class TopologyBuilder {
         return Collections.unmodifiableSet(topics);
     }
 
-    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopicNames)
{
-        Set<String> topics = new HashSet<>();
-        for (String topic : sourceTopicNames) {
+    /**
+     * @return a mapping from state store name to a Set of source Topics.
+     */
+    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
+        final Map<String, Set<String>> results = new HashMap<>();
+        for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet())
{
+            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+
+        }
+        return results;
+    }
+
+    /**
+     * Returns the copartition groups.
+     * A copartition group is a group of source topics that are required to be copartitioned.
+     *
+     * @return groups of topic names
+     */
+    public synchronized Collection<Set<String>> copartitionGroups() {
+        List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+        for (Set<String> nodeNames : copartitionSourceGroups) {
+            Set<String> copartitionGroup = new HashSet<>();
+            for (String node : nodeNames) {
+                String[] topics = nodeToSourceTopics.get(node);
+                if (topics != null)
+                    copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
+            }
+            list.add(Collections.unmodifiableSet(copartitionGroup));
+        }
+        return Collections.unmodifiableList(list);
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopics)
{
+        return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()]));
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics)
{
+        final Set<String> decoratedTopics = new HashSet<>();
+        for (String topic : sourceTopics) {
             if (internalTopicNames.contains(topic)) {
-                if (applicationId == null) {
-                    throw new TopologyBuilderException("there are internal topics and "
-                                                       + "applicationId is null. Call "
-                                                       + "setApplicationId first");
-                }
-                topics.add(applicationId + "-" + topic);
+                decoratedTopics.add(decorateTopic(topic));
             } else {
-                topics.add(topic);
+                decoratedTopics.add(topic);
             }
         }
-        return topics;
+        return decoratedTopics;
+    }
+
+    private String decorateTopic(String topic) {
+        if (applicationId == null) {
+            throw new TopologyBuilderException("there are internal topics and "
+                    + "applicationId hasn't been set. Call "
+                    + "setApplicationId first");
+        }
+
+        return applicationId + "-" + topic;
     }
 
     public synchronized Pattern sourceTopicPattern() {
@@ -948,28 +981,4 @@ public class TopologyBuilder {
     public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates)
{
         this.subscriptionUpdates = subscriptionUpdates;
     }
-
-    /**
-     * Set the applicationId. This is required before calling
-     * {@link #sourceTopics}, {@link #topicGroups}, {@link #copartitionSources}, and
-     * {@link #stateStoreNameToSourceTopics}
-     * @param applicationId   the streams applicationId. Should be the same as set by
-     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
-     */
-    public synchronized void setApplicationId(String applicationId) {
-        Objects.requireNonNull(applicationId, "applicationId can't be null");
-        this.applicationId = applicationId;
-    }
-
-    /**
-     * @return a mapping from state store name to a Set of source Topics.
-     */
-    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
-        final Map<String, Set<String>> results = new HashMap<>();
-        for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet())
{
-            results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
-
-        }
-        return results;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 50d77c3..c0e54b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -542,7 +542,7 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions)
{
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(id.topicGroupId);
 
         return new StreamTask(id, applicationId, partitions, topology, consumer, producer,
restoreConsumer, config, sensors, stateDirectory);
     }
@@ -612,7 +612,7 @@ public class StreamThread extends Thread {
     protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
             return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer,
config, sensors, stateDirectory);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 0650b95..fb19c0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -48,6 +48,7 @@ public class KStreamBranchTest {
     @Test
     public void testKStreamBranch() {
         KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId("X");
 
         Predicate<Integer, String> isEven = new Predicate<Integer, String>()
{
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 9cbc156..fb2afec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -143,7 +143,7 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            builder.build("X", null).processors().size());
+            builder.setApplicationId("X").build(null).processors().size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 6f047b0..fe66acb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -227,17 +227,18 @@ public class TopologyBuilderTest {
     @Test
     public void testAddStateStore() {
         final TopologyBuilder builder = new TopologyBuilder();
-        List<StateStoreSupplier> suppliers;
 
         StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
         builder.addStateStore(supplier);
-        suppliers = builder.build("X", null).stateStoreSuppliers();
-        assertEquals(0, suppliers.size());
+        builder.setApplicationId("X");
+
+        assertEquals(0, builder.build(null).stateStoreSuppliers().size());
 
         builder.addSource("source-1", "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
-        suppliers = builder.build("X", null).stateStoreSuppliers();
+
+        List<StateStoreSupplier> suppliers = builder.build(null).stateStoreSuppliers();
         assertEquals(1, suppliers.size());
         assertEquals(supplier.name(), suppliers.get(0).name());
     }
@@ -245,7 +246,8 @@ public class TopologyBuilderTest {
     @Test
     public void testTopicGroups() {
         final TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId("X");
+        builder.addInternalTopic("topic-1x");
         builder.addSource("source-1", "topic-1", "topic-1x");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -262,7 +264,7 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1",
"X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
         expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3",
"topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
         expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String>emptySet(), Collections.<String>emptySet()));
 
@@ -271,7 +273,7 @@ public class TopologyBuilderTest {
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
-        assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
+        assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
     @Test
@@ -322,9 +324,10 @@ public class TopologyBuilderTest {
         builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        ProcessorTopology topology0 = builder.build("X", 0);
-        ProcessorTopology topology1 = builder.build("X", 1);
-        ProcessorTopology topology2 = builder.build("X", 2);
+        builder.setApplicationId("X");
+        ProcessorTopology topology0 = builder.build(0);
+        ProcessorTopology topology1 = builder.build(1);
+        ProcessorTopology topology2 = builder.build(2);
 
         assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
         assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
@@ -379,12 +382,6 @@ public class TopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullApplicationIdOnBuild() throws Exception {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.build(null, 1);
-    }
-
-    @Test(expected = NullPointerException.class)
     public void shouldNotSetApplicationIdToNull() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d08780b..f7ef7f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -87,7 +87,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testTopologyMetadata() {
-        final TopologyBuilder builder = new TopologyBuilder();
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
 
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2", "topic-3");
@@ -96,7 +96,7 @@ public class ProcessorTopologyTest {
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build("X", null);
+        final ProcessorTopology topology = builder.build(null);
 
         assertEquals(6, topology.processors().size());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1a66d32..1da7592 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -153,7 +153,7 @@ public class StreamThreadTest {
     public void testPartitionAssignmentChange() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        TopologyBuilder builder = new TopologyBuilder();
+        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addSource("source3", "topic3");
@@ -162,7 +162,7 @@ public class StreamThreadTest {
         StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder))
{
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                ProcessorTopology topology = builder.build("X", id.topicGroupId);
+                ProcessorTopology topology = builder.build(id.topicGroupId);
                 return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
             }
         };
@@ -275,7 +275,7 @@ public class StreamThreadTest {
 
             MockTime mockTime = new MockTime();
 
-            TopologyBuilder builder = new TopologyBuilder();
+            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
             StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder))
{
@@ -286,7 +286,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                    ProcessorTopology topology = builder.build("X", id.topicGroupId);
+                    ProcessorTopology topology = builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
                 }
             };
@@ -394,7 +394,7 @@ public class StreamThreadTest {
 
             MockTime mockTime = new MockTime();
 
-            TopologyBuilder builder = new TopologyBuilder();
+            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
             StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder))
{
@@ -405,7 +405,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
-                    ProcessorTopology topology = builder.build("X", id.topicGroupId);
+                    ProcessorTopology topology = builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
                 }
             };
@@ -465,7 +465,7 @@ public class StreamThreadTest {
 
     @Test
     public void testInjectClients() {
-        TopologyBuilder builder = new TopologyBuilder();
+        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
         StreamsConfig config = new StreamsConfig(configProps());
         MockClientSupplier clientSupplier = new MockClientSupplier();
         StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 1baedbb..795d7da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -92,7 +92,8 @@ public class StreamThreadStateStoreProviderTest {
         configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
         configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
 
-        final ProcessorTopology topology = builder.build("X", null);
+        builder.setApplicationId(applicationId);
+        final ProcessorTopology topology = builder.build(null);
         final Map<TaskId, StreamTask> tasks = new HashMap<>();
         stateDirectory = new StateDirectory(applicationId, stateConfigDir);
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7316804..3901d3a 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,7 +57,8 @@ public class KStreamTestDriver {
                              File stateDir,
                              Serde<?> keySerde,
                              Serde<?> valSerde) {
-        this.topology = builder.build("X", null);
+        builder.setApplicationId("TestDriver");
+        this.topology = builder.build(null);
         this.stateDir = stateDir;
         this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
         this.context.setTime(0L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2bac70a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index d2d9668..6b8d969 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -148,7 +148,7 @@ public class ProcessorTopologyTestDriver {
      */
     public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String...
storeNames) {
         id = new TaskId(0, 0);
-        topology = builder.build("X", null);
+        topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null);
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);


Mime
View raw message