kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3443: support for adding sources to KafkaStreams via Pattern.
Date Thu, 16 Jun 2016 02:20:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1ef7b494b -> fb42558e2


KAFKA-3443: support for adding sources to KafkaStreams via Pattern.

This PR is the follow on to the closed PR #1410.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1477 from bbejeck/KAFKA-3443_streams_support_for_regex_sources


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb42558e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb42558e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb42558e

Branch: refs/heads/trunk
Commit: fb42558e2500835722a4e5028896ddae4f407d6f
Parents: 1ef7b49
Author: bbejeck <bbejeck@gmail.com>
Authored: Wed Jun 15 19:20:43 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jun 15 19:20:43 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |  19 +-
 .../kafka/streams/kstream/KStreamBuilder.java   |  39 ++
 .../streams/processor/TopologyBuilder.java      | 150 +++++++-
 .../internals/StreamPartitionAssignor.java      |  37 +-
 .../processor/internals/StreamThread.java       |  10 +-
 .../integration/RegexSourceIntegrationTest.java | 365 +++++++++++++++++++
 .../utils/EmbeddedSingleNodeKafkaCluster.java   |   7 +-
 .../integration/utils/KafkaEmbedded.java        |  15 +
 .../streams/processor/TopologyBuilderTest.java  |  43 ++-
 9 files changed, 664 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 54b19a3..3934627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,6 +12,13 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -22,13 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -292,7 +292,10 @@ public final class Metadata {
             unauthorizedTopics.retainAll(this.topics.keySet());
 
             for (String topic : this.topics.keySet()) {
-                partitionInfos.addAll(cluster.partitionsForTopic(topic));
+                List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
+                if (partitionInfoList != null) {
+                    partitionInfos.addAll(partitionInfoList);
+                }
             }
             nodes = cluster.nodes();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 9d90ba0..53b2f4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL
@@ -55,6 +56,22 @@ public class KStreamBuilder extends TopologyBuilder {
         return stream(null, null, topics);
     }
 
+
+    /**
+     * Create a {@link KStream} instance from the specified Pattern.
+     * The default deserializers specified in the config are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+     * and there is no ordering guarantee between records from different topics
+     *
+     * @param topicPattern    the Pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public <K, V> KStream<K, V> stream(Pattern topicPattern) {
+        return stream(null, null, topicPattern);
+    }
+
+
     /**
      * Create a {@link KStream} instance from the specified topics.
      * <p>
@@ -75,6 +92,28 @@ public class KStreamBuilder extends TopologyBuilder {
         return new KStreamImpl<>(this, name, Collections.singleton(name));
     }
 
+
+    /**
+     * Create a {@link KStream} instance from the specified Pattern.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created stream will read data from all of them,
+     * and there is no ordering guarantee between records from different topics.
+     *
+     * @param keySerde  key serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the configs will be used
+     * @param valSerde  value serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the configs will be used
+     * @param topicPattern    the Pattern to match for topic names
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) {
+        String name = newName(KStreamImpl.SOURCE_NAME);
+
+        addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+
+        return new KStreamImpl<>(this, name, Collections.singleton(name));
+    }
+
     /**
      * Create a {@link KTable} instance for the specified topic.
      * The default deserializers specified in the config are used.

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5425149..1743baf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,6 +40,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
@@ -62,8 +64,14 @@ public class TopologyBuilder {
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
     private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>();
+    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
+    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+    private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
     private Map<Integer, Set<String>> nodeGroups = null;
+    private Pattern topicPattern;
+
+
 
     private static class StateStoreFactory {
         public final Set<String> users;
@@ -110,23 +118,49 @@ public class TopologyBuilder {
         }
     }
 
-    private static class SourceNodeFactory extends NodeFactory {
-        public final String[] topics;
+    private class SourceNodeFactory extends NodeFactory {
+        private final String[] topics;
+        public final Pattern pattern;
         private Deserializer keyDeserializer;
         private Deserializer valDeserializer;
 
-        private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
+        private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
             super(name);
-            this.topics = topics.clone();
+            this.topics = topics != null ? topics.clone() : null;
+            this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
         }
 
+        public String[] getTopics() {
+            return topics;
+        }
+
+        public String[] getTopics(Collection<String> subscribedTopics) {
+            List<String> matchedTopics = new ArrayList<>();
+            for (String update : subscribedTopics) {
+                if (this.pattern == topicToPatterns.get(update)) {
+                    matchedTopics.add(update);
+                    //not same pattern instance,but still matches not allowed
+                } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
+                    throw new TopologyBuilderException("Topic " + update + " already matched check for overlapping regex patterns");
+                } else if (isMatch(update)) {
+                    topicToPatterns.put(update, this.pattern);
+                    matchedTopics.add(update);
+                }
+            }
+            return matchedTopics.toArray(new String[matchedTopics.size()]);
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build(String applicationId) {
             return new SourceNode(name, keyDeserializer, valDeserializer);
         }
+
+        private boolean isMatch(String topic) {
+            return this.pattern.matcher(topic).matches();
+        }
     }
 
     private class SinkNodeFactory extends NodeFactory {
@@ -193,7 +227,7 @@ public class TopologyBuilder {
     public TopologyBuilder() {}
 
     /**
-     * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
      * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
@@ -207,6 +241,23 @@ public class TopologyBuilder {
         return addSource(name, (Deserializer) null, (Deserializer) null, topics);
     }
 
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addSource(String name, Pattern topicPattern) {
+        return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern);
+    }
+
     /**
      * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -231,10 +282,16 @@ public class TopologyBuilder {
             if (sourceTopicNames.contains(topic))
                 throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
 
+            for (Pattern pattern : nodeToSourcePatterns.values()) {
+                if (pattern.matcher(topic).matches()) {
+                    throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
+                }
+            }
+
             sourceTopicNames.add(topic);
         }
 
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
         nodeToSourceTopics.put(name, topics.clone());
         nodeGrouper.add(name);
 
@@ -242,6 +299,49 @@ public class TopologyBuilder {
     }
 
     /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers. The provided
+     * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param name the unique name of the source used to reference this node when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+     */
+
+    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+
+        if (topicPattern == null) {
+            throw new TopologyBuilderException("Pattern can't be null");
+        }
+
+        if (nodeFactories.containsKey(name)) {
+            throw new TopologyBuilderException("Processor " + name + " is already added.");
+        }
+
+        for (String sourceTopicName : sourceTopicNames) {
+            if (topicPattern.matcher(sourceTopicName).matches()) {
+                throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
+            }
+        }
+
+        nodeToSourcePatterns.put(name, topicPattern);
+        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer));
+        nodeGrouper.add(name);
+
+        return this;
+    }
+
+    /**
      * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
@@ -504,9 +604,19 @@ public class TopologyBuilder {
     public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
+
+        if (subscriptionUpdates.hasUpdates()) {
+            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
+                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                //need to update nodeToSourceTopics with topics matched from given regex
+                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+            }
+        }
+
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
+
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
             Set<String> sinkTopics = new HashSet<>();
             Set<String> sourceTopics = new HashSet<>();
@@ -677,7 +787,9 @@ public class TopologyBuilder {
                         }
                     }
                 } else if (factory instanceof SourceNodeFactory) {
-                    for (String topic : ((SourceNodeFactory) factory).topics) {
+                    SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory;
+                    String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics();
+                    for (String topic : topics) {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
                             topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
@@ -713,4 +825,28 @@ public class TopologyBuilder {
         }
         return Collections.unmodifiableSet(topics);
     }
+
+    public Pattern sourceTopicPattern() {
+        if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
+            StringBuilder builder = new StringBuilder();
+            for (Pattern pattern : nodeToSourcePatterns.values()) {
+                builder.append(pattern.pattern()).append("|");
+            }
+            if (!nodeToSourceTopics.isEmpty()) {
+                for (String[] topics : nodeToSourceTopics.values()) {
+                    for (String topic : topics) {
+                        builder.append(topic).append("|");
+                    }
+                }
+            }
+
+            builder.setLength(builder.length() - 1);
+            this.topicPattern = Pattern.compile(builder.toString());
+        }
+        return this.topicPattern;
+    }
+
+    public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
+        this.subscriptionUpdates = subscriptionUpdates;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 085ff94..adefab9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,8 +118,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
-        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
-
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
                     (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
@@ -228,12 +226,17 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
+        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
         // decode subscription info
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
+            if (streamThread.builder.sourceTopicPattern() != null) {
+               // update the topic groups with the returned subscription list for regex pattern subscriptions
+                subscriptionUpdates.updateTopics(subscription.topics());
+            }
+
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
 
             Set<String> consumers = consumersByClient.get(info.processId);
@@ -255,6 +258,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             state.capacity = state.capacity + 1d;
         }
 
+        streamThread.builder.updateSubscriptions(subscriptionUpdates);
+        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
+
         // ensure the co-partitioning topics within the group have the same number of partitions,
         // and enforce the number of partitions for those internal topics.
         internalSourceTopicToTaskIds = new HashMap<>();
@@ -486,4 +492,29 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
+
+    /**
+     * Used to capture subscribed topic via Patterns discovered during the
+     * partition assignment process.
+     */
+    public static  class SubscriptionUpdates {
+
+        private final Set<String> updatedTopicSubscriptions = new HashSet<>();
+
+
+        private  void updateTopics(Collection<String> topicNames) {
+            updatedTopicSubscriptions.clear();
+            updatedTopicSubscriptions.addAll(topicNames);
+        }
+
+        public Collection<String> getUpdates() {
+            return Collections.unmodifiableSet(new HashSet<>(updatedTopicSubscriptions));
+        }
+
+        public boolean hasUpdates() {
+            return !updatedTopicSubscriptions.isEmpty();
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72eeef5..64127a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -62,6 +62,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 
@@ -78,6 +79,7 @@ public class StreamThread extends Thread {
     protected final StreamsConfig config;
     protected final TopologyBuilder builder;
     protected final Set<String> sourceTopics;
+    protected final Pattern topicPattern;
     protected final Producer<byte[], byte[]> producer;
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
@@ -160,6 +162,7 @@ public class StreamThread extends Thread {
         this.config = config;
         this.builder = builder;
         this.sourceTopics = builder.sourceTopics(applicationId);
+        this.topicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -283,7 +286,12 @@ public class StreamThread extends Thread {
         long lastPoll = 0L;
         boolean requiresPoll = true;
 
-        consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+        if (topicPattern != null) {
+            consumer.subscribe(topicPattern, rebalanceListener);
+        } else {
+            consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener);
+        }
+
 
         while (stillRunning()) {
             // try to fetch some records if necessary

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
new file mode 100644
index 0000000..7e18cff
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test based on using regex and named topics for creating sources, using
+ * an embedded Kafka cluster.
+ */
+
+public class RegexSourceIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
+    private static final String TOPIC_1 = "topic-1";
+    private static final String TOPIC_2 = "topic-2";
+    private static final String TOPIC_A = "topic-A";
+    private static final String TOPIC_C = "topic-C";
+    private static final String TOPIC_Y = "topic-Y";
+    private static final String TOPIC_Z = "topic-Z";
+    private static final String FA_TOPIC = "fa";
+    private static final String FOO_TOPIC = "foo";
+
+    private static final int FIRST_UPDATE = 0;
+    private static final int SECOND_UPDATE = 1;
+
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+    private Properties streamsConfiguration;
+
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(TOPIC_1);
+        CLUSTER.createTopic(TOPIC_2);
+        CLUSTER.createTopic(TOPIC_A);
+        CLUSTER.createTopic(TOPIC_C);
+        CLUSTER.createTopic(TOPIC_Y);
+        CLUSTER.createTopic(TOPIC_Z);
+        CLUSTER.createTopic(FA_TOPIC);
+        CLUSTER.createTopic(FOO_TOPIC);
+
+    }
+
+    @Before
+    public void setUp() {
+        streamsConfiguration = getStreamsConfig();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+
+        CLUSTER.createTopic("TEST-TOPIC-1");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+
+        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+
+        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+        streamThreadsField.setAccessible(true);
+        StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
+        StreamThread originalThread = streamThreads[0];
+
+        TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+                                           new DefaultKafkaClientSupplier(),
+                                           originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+
+        streamThreads[0] = testStreamThread;
+        streams.start();
+        testStreamThread.waitUntilTasksUpdated();
+
+        CLUSTER.createTopic("TEST-TOPIC-2");
+
+        testStreamThread.waitUntilTasksUpdated();
+
+        streams.close();
+
+        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
+        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+
+        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
+        assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
+    }
+
+    @Test
+    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
+
+        CLUSTER.createTopic("TEST-TOPIC-A");
+        CLUSTER.createTopic("TEST-TOPIC-B");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
+
+        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+
+        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
+        streamThreadsField.setAccessible(true);
+        StreamThread[] streamThreads =  (StreamThread[]) streamThreadsField.get(streams);
+        StreamThread originalThread = streamThreads[0];
+
+        TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
+                new DefaultKafkaClientSupplier(),
+                originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+
+        streamThreads[0] = testStreamThread;
+        streams.start();
+
+        testStreamThread.waitUntilTasksUpdated();
+
+        CLUSTER.deleteTopic("TEST-TOPIC-A");
+
+        testStreamThread.waitUntilTasksUpdated();
+
+        streams.close();
+
+        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
+        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
+
+        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
+        assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
+    }
+
+
+    @Test
+    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
+
+        String topic1TestMessage = "topic-1 test";
+        String topic2TestMessage = "topic-2 test";
+        String topicATestMessage = "topic-A test";
+        String topicCTestMessage = "topic-C test";
+        String topicYTestMessage = "topic-Y test";
+        String topicZTestMessage = "topic-Z test";
+
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
+        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
+        KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+
+        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        Properties producerConfig = getProducerConfig();
+
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig);
+
+        Properties consumerConfig = getConsumerConfig();
+
+        List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+        List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+        List<String> actualValues = new ArrayList<>(6);
+
+        for (KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
+            actualValues.add(receivedKeyValue.value);
+        }
+
+        streams.close();
+        Collections.sort(actualValues);
+        Collections.sort(expectedReceivedValues);
+        assertThat(actualValues, equalTo(expectedReceivedValues));
+    }
+
+    //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
+    @Test(expected = AssertionError.class)
+    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
+
+        String fooMessage = "fooMessage";
+        String fMessage = "fMessage";
+
+
+        final Serde<String> stringSerde = Serdes.String();
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+
+        //  overlapping patterns here, no messages should be sent as TopologyBuilderException
+        //  will be thrown when the processor topology is built.
+
+        KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
+        KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
+
+
+        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        Properties producerConfig = getProducerConfig();
+
+        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig);
+        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig);
+
+        Properties consumerConfig = getConsumerConfig();
+
+        try {
+            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+        } finally {
+            streams.close();
+        }
+
+    }
+
+    private Properties getProducerConfig() {
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return producerConfig;
+    }
+
+    private Properties getStreamsConfig() {
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+        return streamsConfiguration;
+    }
+
+    private Properties getConsumerConfig() {
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        return consumerConfig;
+    }
+
+    private class TestStreamThread extends StreamThread {
+
+        public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
+        private int index =  0;
+        public volatile boolean streamTaskUpdated = false;
+
+        public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
+            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+        }
+
+        @Override
+        public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
+            List<String> assignedTopics = new ArrayList<>();
+            for (TopicPartition partition : partitions) {
+                assignedTopics.add(partition.topic());
+            }
+            Collections.sort(assignedTopics);
+            streamTaskUpdated = true;
+            assignedTopicPartitions.put(index++, assignedTopics);
+            return super.createStreamTask(id, partitions);
+        }
+
+
+        void waitUntilTasksUpdated() {
+            long maxTimeMillis = 30000;
+            long startTime = System.currentTimeMillis();
+            while (!streamTaskUpdated && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
+               //empty loop just waiting for update
+            }
+            streamTaskUpdated = false;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
index 34753ae..d3ba065 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -19,12 +19,12 @@ package org.apache.kafka.streams.integration.utils;
 
 import kafka.server.KafkaConfig$;
 import kafka.zk.EmbeddedZookeeper;
+import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
-import org.junit.rules.ExternalResource;
 
 /**
  * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
@@ -48,6 +48,7 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+        brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
 
         log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
         broker = new KafkaEmbedded(brokerConfig);
@@ -125,4 +126,8 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
                             Properties topicConfig) {
         broker.createTopic(topic, partitions, replication, topicConfig);
     }
+
+    public void deleteTopic(String topic) {
+        broker.deleteTopic(topic);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 348b46b..43b82d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -186,4 +186,19 @@ public class KafkaEmbedded {
         AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
         zkClient.close();
     }
+
+    public void deleteTopic(String topic) {
+        log.debug("Deleting topic { name: {} }", topic);
+
+        ZkClient zkClient = new ZkClient(
+                zookeeperConnect(),
+                DEFAULT_ZK_SESSION_TIMEOUT_MS,
+                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+                ZKStringSerializer$.MODULE$);
+        boolean isSecure = false;
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        AdminUtils.deleteTopic(zkUtils, topic);
+        zkClient.close();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 9af313a..28acf09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -152,6 +153,46 @@ public class TopologyBuilderTest {
         assertEquals(expected, builder.sourceTopics("X"));
     }
 
+    @Test
+    public void testPatternSourceTopic() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        Pattern expectedPattern = Pattern.compile("topic-\\d");
+        builder.addSource("source-1", expectedPattern);
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testAddMoreThanOnePatternSourceNode() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
+        builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
+        builder.addSource("source-2", Pattern.compile(".*-\\d"));
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test
+    public void testSubscribeTopicNameAndPattern() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        Pattern expectedPattern = Pattern.compile(".*-\\d|topic-foo|topic-bar");
+        builder.addSource("source-1", "topic-foo", "topic-bar");
+        builder.addSource("source-2", Pattern.compile(".*-\\d"));
+        assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testPatternMatchesAlreadyProvidedTopicSource() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source-1", "foo");
+        builder.addSource("source-2", Pattern.compile("f.*"));
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void testNamedTopicMatchesAlreadyProvidedPattern() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source-1", Pattern.compile("f.*"));
+        builder.addSource("source-2", "foo");
+    }
+
     @Test(expected = TopologyBuilderException.class)
     public void testAddStateStoreWithNonExistingProcessor() {
         final TopologyBuilder builder = new TopologyBuilder();


Mime
View raw message