kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4791: unable to add state store with regex matched topics
Date Thu, 30 Mar 2017 22:44:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4e92fd5f7 -> 15e0234a5


KAFKA-4791: unable to add state store with regex matched topics

Fix for adding state stores with regex defined sources

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #2618 from bbejeck/KAFKA-4791_unable_to_add_statestore_regex_topics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15e0234a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15e0234a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15e0234a

Branch: refs/heads/trunk
Commit: 15e0234a5f4976facd4cfe61b91cfcdec6f6083c
Parents: 4e92fd5
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Thu Mar 30 15:44:56 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 30 15:44:56 2017 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 185 ++++++++++++-------
 .../integration/RegexSourceIntegrationTest.java |  30 +++
 .../streams/processor/TopologyBuilderTest.java  |  34 ++++
 3 files changed, 178 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 99f5d65..7c2ec4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -99,6 +99,10 @@ public class TopologyBuilder {
     // are connected to these state stores
     private final Map<String, Set<String>> stateStoreNameToSourceTopics = new
HashMap<>();
 
+    // map from state store names to all the regex subscribed topics from source processors
that
+    // are connected to these state stores
+    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new
HashMap<>();
+
     // map from state store names to this state store's corresponding changelog topic if
possible,
     // this is used in the extended KStreamBuilder.
     private final Map<String, String> storeToChangelogTopic = new HashMap<>();
@@ -174,7 +178,7 @@ public class TopologyBuilder {
 
         private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?>
keyDeserializer, Deserializer<?> valDeserializer) {
             super(name);
-            this.topics = topics != null ? Arrays.asList(topics) : null;
+            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
@@ -311,7 +315,7 @@ public class TopologyBuilder {
      * @param applicationId the streams applicationId. Should be the same as set by
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
      */
-    public synchronized final TopologyBuilder setApplicationId(String applicationId) {
+    public synchronized final TopologyBuilder setApplicationId(final String applicationId)
{
         Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
 
@@ -329,7 +333,7 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(String name, String... topics) {
+    public synchronized final TopologyBuilder addSource(final String name, final String...
topics) {
         return addSource(null, name, null, null, topics);
     }
 
@@ -345,7 +349,7 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name,  String... topics) {
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,  final String... topics) {
         return addSource(offsetReset, name, null, null, topics);
     }
 
@@ -362,7 +366,7 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source
is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern)
{
+    public synchronized final TopologyBuilder addSource(final String name, final Pattern
topicPattern) {
         return addSource(null, name, null, null, topicPattern);
     }
 
@@ -379,7 +383,7 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source
is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name,  Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,  final Pattern topicPattern) {
         return addSource(offsetReset, name, null, null, topicPattern);
     }
 
@@ -400,7 +404,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
      */
-    public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, String... topics) {
+    public synchronized final TopologyBuilder addSource(final String name, final Deserializer
keyDeserializer, final Deserializer valDeserializer, final String... topics) {
         return addSource(null, name, keyDeserializer, valDeserializer, topics);
 
     }
@@ -422,7 +426,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
      */
-    public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer,
final String... topics) {
         if (topics.length == 0) {
             throw new TopologyBuilderException("You must provide at least one topic");
         }
@@ -540,7 +544,7 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by name
      */
 
-    public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(final String name, final Deserializer
keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
         return addSource(null, name,  keyDeserializer, valDeserializer, topicPattern);
 
     }
@@ -566,7 +570,7 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by name
      */
 
-    public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,  final Deserializer keyDeserializer, final Deserializer valDeserializer,
final Pattern topicPattern) {
         Objects.requireNonNull(topicPattern, "topicPattern can't be null");
         Objects.requireNonNull(name, "name can't be null");
 
@@ -604,7 +608,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(String name, String topic, String...
parentNames) {
+    public synchronized final TopologyBuilder addSink(final String name, final String topic,
final String... parentNames) {
         return addSink(name, topic, null, null, parentNames);
     }
 
@@ -631,7 +635,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner
partitioner, String... parentNames) {
+    public synchronized final TopologyBuilder addSink(final String name, final String topic,
final StreamPartitioner partitioner, final String... parentNames) {
         return addSink(name, topic, null, null, partitioner, parentNames);
     }
 
@@ -654,7 +658,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(String name, String topic, Serializer
keySerializer, Serializer valSerializer, String... parentNames) {
+    public synchronized final TopologyBuilder addSink(final String name, final String topic,
final Serializer keySerializer, final Serializer valSerializer, final String... parentNames)
{
         return addSink(name, topic, keySerializer, valSerializer, null, parentNames);
     }
 
@@ -679,7 +683,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public synchronized final <K, V> TopologyBuilder addSink(String name, String topic,
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<?
super K, ? super V> partitioner, String... parentNames) {
+    public synchronized final <K, V> TopologyBuilder addSink(final String name, final
String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)
{
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
         if (nodeFactories.containsKey(name))
@@ -713,7 +717,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier
supplier, String... parentNames) {
+    public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier
supplier, final String... parentNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(supplier, "supplier must not be null");
         if (nodeFactories.containsKey(name))
@@ -742,7 +746,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if state store supplier is already added
      */
-    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
String... processorNames) {
+    public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyBuilderException("StateStore " + supplier.name() + " is already
added.");
@@ -766,7 +770,7 @@ public class TopologyBuilder {
      * @param stateStoreNames the names of state stores that the processor uses
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName,
String... stateStoreNames) {
+    public synchronized final TopologyBuilder connectProcessorAndStateStores(final String
processorName, final String... stateStoreNames) {
         Objects.requireNonNull(processorName, "processorName can't be null");
         if (stateStoreNames != null) {
             for (String stateStoreName : stateStoreNames) {
@@ -781,7 +785,7 @@ public class TopologyBuilder {
      * This is used only for KStreamBuilder: when adding a KTable from a source topic,
      * we need to add the topic as the KTable's materialized state store's changelog.
      */
-    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName,
String topic) {
+    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String
sourceStoreName, final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyBuilderException("Source store " + sourceStoreName + " is already
added.");
         }
@@ -799,7 +803,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if
one of the processors is not added yet
      */
-    public synchronized final TopologyBuilder connectProcessors(String... processorNames)
{
+    public synchronized final TopologyBuilder connectProcessors(final String... processorNames)
{
         if (processorNames.length < 2)
             throw new TopologyBuilderException("At least two processors need to participate
in the connection.");
 
@@ -822,7 +826,7 @@ public class TopologyBuilder {
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addInternalTopic(String topicName) {
+    public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         this.internalTopicNames.add(topicName);
 
@@ -835,69 +839,85 @@ public class TopologyBuilder {
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
+    public synchronized final TopologyBuilder copartitionSources(final Collection<String>
sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
         return this;
     }
 
-    private void connectProcessorAndStateStore(String processorName, String stateStoreName)
{
+    private void connectProcessorAndStateStore(final String processorName, final String stateStoreName)
{
         if (!stateFactories.containsKey(stateStoreName))
             throw new TopologyBuilderException("StateStore " + stateStoreName + " is not
added yet.");
         if (!nodeFactories.containsKey(processorName))
             throw new TopologyBuilderException("Processor " + processorName + " is not added
yet.");
 
-        StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
-        Iterator<String> iter = stateStoreFactory.users.iterator();
+        final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+        final Iterator<String> iter = stateStoreFactory.users.iterator();
         if (iter.hasNext()) {
-            String user = iter.next();
+            final String user = iter.next();
             nodeGrouper.unite(user, processorName);
         }
         stateStoreFactory.users.add(processorName);
 
         NodeFactory nodeFactory = nodeFactories.get(processorName);
         if (nodeFactory instanceof ProcessorNodeFactory) {
-            ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+            final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
             processorNodeFactory.addStateStore(stateStoreName);
-            connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
+            connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
         } else {
             throw new TopologyBuilderException("cannot connect a state store " + stateStoreName
+ " to a source node or a sink node.");
         }
     }
 
-    private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
-        final Set<String> sourceTopics = new HashSet<>();
+    private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents)
{
+        final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
         for (String parent : parents) {
-            NodeFactory nodeFactory = nodeFactories.get(parent);
+            final NodeFactory nodeFactory = nodeFactories.get(parent);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics);
+                sourceNodes.add((SourceNodeFactory) nodeFactory);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
nodeFactory).parents));
+                sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory)
nodeFactory).parents));
             }
         }
-        return sourceTopics;
+        return sourceNodes;
     }
 
-    private void connectStateStoreNameToSourceTopics(final String stateStoreName,
-                                                     final ProcessorNodeFactory processorNodeFactory)
{
+    private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+                                                              final ProcessorNodeFactory
processorNodeFactory) {
 
         // we should never update the mapping from state store names to source topics if
the store name already exists
         // in the map; this scenario is possible, for example, that a state store underlying
a source KTable is
         // connecting to a join operator whose source topic is not the original KTable's
source topic but an internal repartition topic.
-        if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+
+        if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName))
{
             return;
         }
 
-        final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
-        if (sourceTopics.isEmpty()) {
-            throw new TopologyBuilderException("can't find source topic for state store "
+
-                    stateStoreName);
+        final Set<String> sourceTopics = new HashSet<>();
+        final Set<Pattern> sourcePatterns = new HashSet<>();
+        final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents);
+
+        for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
+            if (sourceNodeFactory.pattern != null) {
+                sourcePatterns.add(sourceNodeFactory.pattern);
+            } else {
+                sourceTopics.addAll(sourceNodeFactory.topics);
+            }
+        }
+        
+        if (!sourceTopics.isEmpty()) {
+            stateStoreNameToSourceTopics.put(stateStoreName,
+                    Collections.unmodifiableSet(sourceTopics));
+        }
+
+        if (!sourcePatterns.isEmpty()) {
+            stateStoreNameToSourceRegex.put(stateStoreName,
+                    Collections.unmodifiableSet(sourcePatterns));
         }
-        stateStoreNameToSourceTopics.put(stateStoreName,
-                Collections.unmodifiableSet(sourceTopics));
+
     }
 
 
-    private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T>
latestResets, AutoOffsetReset offsetReset, T item) {
+    private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) {
         if (offsetReset != null) {
             switch (offsetReset) {
                 case EARLIEST:
@@ -925,8 +945,8 @@ public class TopologyBuilder {
     }
 
     private Map<Integer, Set<String>> makeNodeGroups() {
-        HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
-        HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
 
@@ -935,7 +955,7 @@ public class TopologyBuilder {
         allSourceNodes.addAll(nodeToSourcePatterns.keySet());
 
         for (String nodeName : Utils.sorted(allSourceNodes)) {
-            String root = nodeGrouper.root(nodeName);
+            final String root = nodeGrouper.root(nodeName);
             Set<String> nodeGroup = rootToNodeGroup.get(root);
             if (nodeGroup == null) {
                 nodeGroup = new HashSet<>();
@@ -948,7 +968,7 @@ public class TopologyBuilder {
         // Go through non-source nodes
         for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
             if (!nodeToSourceTopics.containsKey(nodeName)) {
-                String root = nodeGrouper.root(nodeName);
+                final String root = nodeGrouper.root(nodeName);
                 Set<String> nodeGroup = rootToNodeGroup.get(root);
                 if (nodeGroup == null) {
                     nodeGroup = new HashSet<>();
@@ -968,7 +988,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public synchronized ProcessorTopology build(Integer topicGroupId) {
+    public synchronized ProcessorTopology build(final Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -1016,12 +1036,12 @@ public class TopologyBuilder {
         return globalGroups;
     }
 
-    private ProcessorTopology build(Set<String> nodeGroup) {
-        List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
-        Map<String, ProcessorNode> processorMap = new HashMap<>();
-        Map<String, SourceNode> topicSourceMap = new HashMap<>();
-        Map<String, SinkNode> topicSinkMap = new HashMap<>();
-        Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+    private ProcessorTopology build(final Set<String> nodeGroup) {
+        final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+        final Map<String, ProcessorNode> processorMap = new HashMap<>();
+        final Map<String, SourceNode> topicSourceMap = new HashMap<>();
+        final Map<String, SinkNode> topicSinkMap = new HashMap<>();
+        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically
sorted)
         for (NodeFactory factory : nodeFactories.values()) {
@@ -1032,7 +1052,7 @@ public class TopologyBuilder {
 
                 if (factory instanceof ProcessorNodeFactory) {
                     for (String parent : ((ProcessorNodeFactory) factory).parents) {
-                        ProcessorNode<?, ?> parentNode = processorMap.get(parent);
+                        final ProcessorNode<?, ?> parentNode = processorMap.get(parent);
                         parentNode.addChild(node);
                     }
                     for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames)
{
@@ -1106,19 +1126,19 @@ public class TopologyBuilder {
      * @return groups of topic names
      */
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
+        final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            Set<String> sinkTopics = new HashSet<>();
-            Set<String> sourceTopics = new HashSet<>();
-            Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
-            Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
+            final Set<String> sinkTopics = new HashSet<>();
+            final Set<String> sourceTopics = new HashSet<>();
+            final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+            final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
             for (String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
-                List<String> topics = nodeToSourceTopics.get(node);
+                final List<String> topics = nodeToSourceTopics.get(node);
                 if (topics != null) {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
@@ -1128,7 +1148,7 @@ public class TopologyBuilder {
                         }
                         if (this.internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
-                            String internalTopic = decorateTopic(topic);
+                            final String internalTopic = decorateTopic(topic);
                             internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
                                                                                         
   Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
                                                                                         
   Collections.<String, String>emptyMap()));
@@ -1140,7 +1160,7 @@ public class TopologyBuilder {
                 }
 
                 // if the node is a sink node, add to the sink topics
-                String topic = nodeToSinkTopic.get(node);
+                final String topic = nodeToSinkTopic.get(node);
                 if (topic != null) {
                     if (internalTopicNames.contains(topic)) {
                         // prefix the change log topic name with the application id
@@ -1175,7 +1195,7 @@ public class TopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (subscriptionUpdates.hasUpdates()) {
             for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
-                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
                 //need to update nodeToSourceTopics with topics matched from given regex
                 nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
                 log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
@@ -1183,6 +1203,28 @@ public class TopologyBuilder {
         }
     }
 
+    private void setRegexMatchedTopicToStateStore() {
+        if (subscriptionUpdates.hasUpdates()) {
+            for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet())
{
+                final Set<String> updatedTopicsForStateStore = new HashSet<>();
+                for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
+                    for (Pattern pattern : storePattern.getValue()) {
+                        if (pattern.matcher(subscriptionUpdateTopic).matches()) {
+                            updatedTopicsForStateStore.add(subscriptionUpdateTopic);
+                        }
+                    }
+                }
+                if (!updatedTopicsForStateStore.isEmpty()) {
+                    Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
+                    if (storeTopics != null) {
+                        updatedTopicsForStateStore.addAll(storeTopics);
+                    }
+                    stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+                }
+            }
+        }
+    }
+    
     private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?>
supplier, final String name) {
         if (!(supplier instanceof WindowStoreSupplier)) {
             return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
supplier.logConfig());
@@ -1223,7 +1265,7 @@ public class TopologyBuilder {
         return  latestPattern;
     }
 
-    private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns,
Set<String> otherTopics) {
+    private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern>
otherPatterns, final Set<String> otherTopics) {
 
         for (Pattern otherPattern : otherPatterns) {
             if (builtPattern.pattern().contains(otherPattern.pattern())) {
@@ -1246,8 +1288,8 @@ public class TopologyBuilder {
      * @param sourcePatterns Patterns for matching source topics to add to a composite pattern
      * @return a Pattern that is composed of the literal source topic names and any Patterns
for matching source topics
      */
-    private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String>
sourceTopics, Collection<Pattern> sourcePatterns) {
-        StringBuilder builder = new StringBuilder();
+    private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String>
sourceTopics, final Collection<Pattern> sourcePatterns) {
+        final StringBuilder builder = new StringBuilder();
 
         for (String topic : sourceTopics) {
             builder.append(topic).append("|");
@@ -1283,7 +1325,7 @@ public class TopologyBuilder {
      * @return groups of topic names
      */
     public synchronized Collection<Set<String>> copartitionGroups() {
-        List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+        final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
         for (Set<String> nodeNames : copartitionSourceGroups) {
             Set<String> copartitionGroup = new HashSet<>();
             for (String node : nodeNames) {
@@ -1308,7 +1350,7 @@ public class TopologyBuilder {
         return decoratedTopics;
     }
 
-    private String decorateTopic(String topic) {
+    private String decorateTopic(final String topic) {
         if (applicationId == null) {
             throw new TopologyBuilderException("there are internal topics and "
                     + "applicationId hasn't been set. Call "
@@ -1320,7 +1362,7 @@ public class TopologyBuilder {
 
     public synchronized Pattern sourceTopicPattern() {
         if (this.topicPattern == null) {
-            List<String> allSourceTopics = new ArrayList<>();
+            final List<String> allSourceTopics = new ArrayList<>();
             if (!nodeToSourceTopics.isEmpty()) {
                 for (List<String> topics : nodeToSourceTopics.values()) {
                     allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
@@ -1334,9 +1376,10 @@ public class TopologyBuilder {
         return this.topicPattern;
     }
 
-    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates,
String threadId) {
+    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String threadId) {
         log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching
regex subscription(s)", threadId, subscriptionUpdates);
         this.subscriptionUpdates = subscriptionUpdates;
         setRegexMatchedTopicsToSourceNodes();
+        setRegexMatchedTopicToStateStore();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index a84a208..b671c4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -32,12 +32,15 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
@@ -55,11 +58,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -230,6 +235,31 @@ public class RegexSourceIntegrationTest {
         streams.close();
     }
 
+    @Test
+    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
+
+        ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore",
false);
+
+        TopologyBuilder builder = new TopologyBuilder()
+                .addSource("ingest", Pattern.compile("topic-\\d+"))
+                .addProcessor("my-processor", processorSupplier, "ingest")
+                .addStateStore(stateStoreSupplier, "my-processor");
+
+
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class);
+
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for
test"), producerConfig, mockTime);
+        streams.close();
+
+        Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+
+        assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1"));
+    }
+
 
     @Test
     public void testShouldReadFromRegexAndNamedTopics() throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 88a420a..7c8b15f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@ import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 public class TopologyBuilderTest {
@@ -697,4 +698,37 @@ public class TopologyBuilderTest {
         assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
 
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
+
+        final TopologyBuilder topologyBuilder = new TopologyBuilder()
+                .addSource("ingest", Pattern.compile("topic-\\d+"))
+                .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
+                .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
+
+        final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+
+        final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+        updatedTopics.add("topic-2");
+        updatedTopics.add("topic-3");
+        updatedTopics.add("topic-A");
+
+        topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
+        topologyBuilder.setApplicationId("test-app");
+
+        Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
+        List<String> topics = stateStoreAndTopics.get("testStateStore");
+
+        assertTrue("Expected to contain two topics", topics.size() == 2);
+
+        assertTrue(topics.contains("topic-2"));
+        assertTrue(topics.contains("topic-3"));
+        assertFalse(topics.contains("topic-A"));
+    }
+
 }


Mime
View raw message