Repository: kafka
Updated Branches:
refs/heads/0.10.2 2397269c4 -> 4b8c73615
KAFKA-4791: unable to add state store with regex matched topics
Fix for adding state stores with regex defined sources
Author: bbejeck <bbejeck@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #2618 from bbejeck/KAFKA-4791_unable_to_add_statestore_regex_topics
(cherry picked from commit 15e0234a5f4976facd4cfe61b91cfcdec6f6083c)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b8c7361
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b8c7361
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b8c7361
Branch: refs/heads/0.10.2
Commit: 4b8c73615e778c244af8394dfbd5befac005c419
Parents: 2397269
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Thu Mar 30 15:44:56 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 30 15:48:07 2017 -0700
----------------------------------------------------------------------
.../streams/processor/TopologyBuilder.java | 185 ++++++++++++-------
.../integration/RegexSourceIntegrationTest.java | 30 +++
.../streams/processor/TopologyBuilderTest.java | 34 ++++
3 files changed, 178 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8c7361/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f4302..92cfda7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -98,6 +98,10 @@ public class TopologyBuilder {
// are connected to these state stores
private final Map<String, Set<String>> stateStoreNameToSourceTopics = new
HashMap<>();
+ // map from state store names to all the regex subscribed topics from source processors
that
+ // are connected to these state stores
+ private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new
HashMap<>();
+
// map from state store names to this state store's corresponding changelog topic if
possible,
// this is used in the extended KStreamBuilder.
private final Map<String, String> storeToChangelogTopic = new HashMap<>();
@@ -175,7 +179,7 @@ public class TopologyBuilder {
private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?>
keyDeserializer, Deserializer<?> valDeserializer) {
super(name);
- this.topics = topics != null ? Arrays.asList(topics) : null;
+ this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
@@ -312,7 +316,7 @@ public class TopologyBuilder {
* @param applicationId the streams applicationId. Should be the same as set by
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
*/
- public synchronized final TopologyBuilder setApplicationId(String applicationId) {
+ public synchronized final TopologyBuilder setApplicationId(final String applicationId)
{
Objects.requireNonNull(applicationId, "applicationId can't be null");
this.applicationId = applicationId;
@@ -330,7 +334,7 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(String name, String... topics) {
+ public synchronized final TopologyBuilder addSource(final String name, final String...
topics) {
return addSource(null, name, null, null, topics);
}
@@ -346,7 +350,7 @@ public class TopologyBuilder {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name, String... topics) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final String... topics) {
return addSource(offsetReset, name, null, null, topics);
}
@@ -363,7 +367,7 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source
is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern)
{
+ public synchronized final TopologyBuilder addSource(final String name, final Pattern
topicPattern) {
return addSource(null, name, null, null, topicPattern);
}
@@ -380,7 +384,7 @@ public class TopologyBuilder {
* @param topicPattern regular expression pattern to match Kafka topics that this source
is to consume
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final Pattern topicPattern) {
return addSource(offsetReset, name, null, null, topicPattern);
}
@@ -401,7 +405,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
*/
- public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, String... topics) {
+ public synchronized final TopologyBuilder addSource(final String name, final Deserializer
keyDeserializer, final Deserializer valDeserializer, final String... topics) {
return addSource(null, name, keyDeserializer, valDeserializer, topics);
}
@@ -423,7 +427,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer,
final String... topics) {
if (topics.length == 0) {
throw new TopologyBuilderException("You must provide at least one topic");
}
@@ -541,7 +545,7 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already
been registered by name
*/
- public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final String name, final Deserializer
keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
return addSource(null, name, keyDeserializer, valDeserializer, topicPattern);
}
@@ -567,7 +571,7 @@ public class TopologyBuilder {
* @throws TopologyBuilderException if processor is already added or if topics have already
been registered by name
*/
- public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String
name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
+ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer,
final Pattern topicPattern) {
Objects.requireNonNull(topicPattern, "topicPattern can't be null");
Objects.requireNonNull(name, "name can't be null");
@@ -605,7 +609,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, String...
parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic,
final String... parentNames) {
return addSink(name, topic, null, null, parentNames);
}
@@ -632,7 +636,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner
partitioner, String... parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic,
final StreamPartitioner partitioner, final String... parentNames) {
return addSink(name, topic, null, null, partitioner, parentNames);
}
@@ -655,7 +659,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
*/
- public synchronized final TopologyBuilder addSink(String name, String topic, Serializer
keySerializer, Serializer valSerializer, String... parentNames) {
+ public synchronized final TopologyBuilder addSink(final String name, final String topic,
final Serializer keySerializer, final Serializer valSerializer, final String... parentNames)
{
return addSink(name, topic, keySerializer, valSerializer, null, parentNames);
}
@@ -680,7 +684,7 @@ public class TopologyBuilder {
* @see #addSink(String, String, Serializer, Serializer, String...)
* @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
*/
- public synchronized final <K, V> TopologyBuilder addSink(String name, String topic,
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<?
super K, ? super V> partitioner, String... parentNames) {
+ public synchronized final <K, V> TopologyBuilder addSink(final String name, final
String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)
{
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
if (nodeFactories.containsKey(name))
@@ -714,7 +718,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
*/
- public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier
supplier, String... parentNames) {
+ public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier
supplier, final String... parentNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
if (nodeFactories.containsKey(name))
@@ -743,7 +747,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if state store supplier is already added
*/
- public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
String... processorNames) {
+ public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
throw new TopologyBuilderException("StateStore " + supplier.name() + " is already
added.");
@@ -767,7 +771,7 @@ public class TopologyBuilder {
* @param stateStoreNames the names of state stores that the processor uses
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName,
String... stateStoreNames) {
+ public synchronized final TopologyBuilder connectProcessorAndStateStores(final String
processorName, final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName can't be null");
if (stateStoreNames != null) {
for (String stateStoreName : stateStoreNames) {
@@ -782,7 +786,7 @@ public class TopologyBuilder {
* This is used only for KStreamBuilder: when adding a KTable from a source topic,
* we need to add the topic as the KTable's materialized state store's changelog.
*/
- protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName,
String topic) {
+ protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String
sourceStoreName, final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyBuilderException("Source store " + sourceStoreName + " is already
added.");
}
@@ -800,7 +804,7 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if
one of the processors is not added yet
*/
- public synchronized final TopologyBuilder connectProcessors(String... processorNames)
{
+ public synchronized final TopologyBuilder connectProcessors(final String... processorNames)
{
if (processorNames.length < 2)
throw new TopologyBuilderException("At least two processors need to participate
in the connection.");
@@ -823,7 +827,7 @@ public class TopologyBuilder {
* @param topicName the name of the topic
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder addInternalTopic(String topicName) {
+ public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
this.internalTopicNames.add(topicName);
@@ -836,69 +840,85 @@ public class TopologyBuilder {
* @param sourceNodes a set of source node names
* @return this builder instance so methods can be chained together; never null
*/
- public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
+ public synchronized final TopologyBuilder copartitionSources(final Collection<String>
sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
return this;
}
- private void connectProcessorAndStateStore(String processorName, String stateStoreName)
{
+ private void connectProcessorAndStateStore(final String processorName, final String stateStoreName)
{
if (!stateFactories.containsKey(stateStoreName))
throw new TopologyBuilderException("StateStore " + stateStoreName + " is not
added yet.");
if (!nodeFactories.containsKey(processorName))
throw new TopologyBuilderException("Processor " + processorName + " is not added
yet.");
- StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
- Iterator<String> iter = stateStoreFactory.users.iterator();
+ final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+ final Iterator<String> iter = stateStoreFactory.users.iterator();
if (iter.hasNext()) {
- String user = iter.next();
+ final String user = iter.next();
nodeGrouper.unite(user, processorName);
}
stateStoreFactory.users.add(processorName);
NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
- ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+ final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
processorNodeFactory.addStateStore(stateStoreName);
- connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
+ connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
throw new TopologyBuilderException("cannot connect a state store " + stateStoreName
+ " to a source node or a sink node.");
}
}
- private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
- final Set<String> sourceTopics = new HashSet<>();
+ private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents)
{
+ final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
for (String parent : parents) {
- NodeFactory nodeFactory = nodeFactories.get(parent);
+ final NodeFactory nodeFactory = nodeFactories.get(parent);
if (nodeFactory instanceof SourceNodeFactory) {
- sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics);
+ sourceNodes.add((SourceNodeFactory) nodeFactory);
} else if (nodeFactory instanceof ProcessorNodeFactory) {
- sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
nodeFactory).parents));
+ sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory)
nodeFactory).parents));
}
}
- return sourceTopics;
+ return sourceNodes;
}
- private void connectStateStoreNameToSourceTopics(final String stateStoreName,
- final ProcessorNodeFactory processorNodeFactory)
{
+ private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName,
+ final ProcessorNodeFactory
processorNodeFactory) {
// we should never update the mapping from state store names to source topics if
the store name already exists
// in the map; this scenario is possible, for example, that a state store underlying
a source KTable is
// connecting to a join operator whose source topic is not the original KTable's
source topic but an internal repartition topic.
- if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+
+ if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName))
{
return;
}
- final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
- if (sourceTopics.isEmpty()) {
- throw new TopologyBuilderException("can't find source topic for state store "
+
- stateStoreName);
+ final Set<String> sourceTopics = new HashSet<>();
+ final Set<Pattern> sourcePatterns = new HashSet<>();
+ final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents);
+
+ for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
+ if (sourceNodeFactory.pattern != null) {
+ sourcePatterns.add(sourceNodeFactory.pattern);
+ } else {
+ sourceTopics.addAll(sourceNodeFactory.topics);
+ }
+ }
+
+ if (!sourceTopics.isEmpty()) {
+ stateStoreNameToSourceTopics.put(stateStoreName,
+ Collections.unmodifiableSet(sourceTopics));
+ }
+
+ if (!sourcePatterns.isEmpty()) {
+ stateStoreNameToSourceRegex.put(stateStoreName,
+ Collections.unmodifiableSet(sourcePatterns));
}
- stateStoreNameToSourceTopics.put(stateStoreName,
- Collections.unmodifiableSet(sourceTopics));
+
}
- private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T>
latestResets, AutoOffsetReset offsetReset, T item) {
+ private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) {
if (offsetReset != null) {
switch (offsetReset) {
case EARLIEST:
@@ -926,8 +946,8 @@ public class TopologyBuilder {
}
private Map<Integer, Set<String>> makeNodeGroups() {
- HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
- HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+ final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+ final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
int nodeGroupId = 0;
@@ -936,7 +956,7 @@ public class TopologyBuilder {
allSourceNodes.addAll(nodeToSourcePatterns.keySet());
for (String nodeName : Utils.sorted(allSourceNodes)) {
- String root = nodeGrouper.root(nodeName);
+ final String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
@@ -949,7 +969,7 @@ public class TopologyBuilder {
// Go through non-source nodes
for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
if (!nodeToSourceTopics.containsKey(nodeName)) {
- String root = nodeGrouper.root(nodeName);
+ final String root = nodeGrouper.root(nodeName);
Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
@@ -969,7 +989,7 @@ public class TopologyBuilder {
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
- public synchronized ProcessorTopology build(Integer topicGroupId) {
+ public synchronized ProcessorTopology build(final Integer topicGroupId) {
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
@@ -1017,12 +1037,12 @@ public class TopologyBuilder {
return globalGroups;
}
- private ProcessorTopology build(Set<String> nodeGroup) {
- List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
- Map<String, ProcessorNode> processorMap = new HashMap<>();
- Map<String, SourceNode> topicSourceMap = new HashMap<>();
- Map<String, SinkNode> topicSinkMap = new HashMap<>();
- Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+ private ProcessorTopology build(final Set<String> nodeGroup) {
+ final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
+ final Map<String, ProcessorNode> processorMap = new HashMap<>();
+ final Map<String, SourceNode> topicSourceMap = new HashMap<>();
+ final Map<String, SinkNode> topicSinkMap = new HashMap<>();
+ final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
// create processor nodes in a topological order ("nodeFactories" is already topologically
sorted)
for (NodeFactory factory : nodeFactories.values()) {
@@ -1033,7 +1053,7 @@ public class TopologyBuilder {
if (factory instanceof ProcessorNodeFactory) {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
- ProcessorNode<?, ?> parentNode = processorMap.get(parent);
+ final ProcessorNode<?, ?> parentNode = processorMap.get(parent);
parentNode.addChild(node);
}
for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames)
{
@@ -1107,19 +1127,19 @@ public class TopologyBuilder {
* @return groups of topic names
*/
public synchronized Map<Integer, TopicsInfo> topicGroups() {
- Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
+ final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (nodeGroups == null)
nodeGroups = makeNodeGroups();
for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
- Set<String> sinkTopics = new HashSet<>();
- Set<String> sourceTopics = new HashSet<>();
- Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
- Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
+ final Set<String> sinkTopics = new HashSet<>();
+ final Set<String> sourceTopics = new HashSet<>();
+ final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>();
+ final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>();
for (String node : entry.getValue()) {
// if the node is a source node, add to the source topics
- List<String> topics = nodeToSourceTopics.get(node);
+ final List<String> topics = nodeToSourceTopics.get(node);
if (topics != null) {
// if some of the topics are internal, add them to the internal topics
for (String topic : topics) {
@@ -1129,7 +1149,7 @@ public class TopologyBuilder {
}
if (this.internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
- String internalTopic = decorateTopic(topic);
+ final String internalTopic = decorateTopic(topic);
internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic,
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap()));
@@ -1141,7 +1161,7 @@ public class TopologyBuilder {
}
// if the node is a sink node, add to the sink topics
- String topic = nodeToSinkTopic.get(node);
+ final String topic = nodeToSinkTopic.get(node);
if (topic != null) {
if (internalTopicNames.contains(topic)) {
// prefix the change log topic name with the application id
@@ -1176,7 +1196,7 @@ public class TopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
- SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+ final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
@@ -1184,6 +1204,28 @@ public class TopologyBuilder {
}
}
+ private void setRegexMatchedTopicToStateStore() {
+ if (subscriptionUpdates.hasUpdates()) {
+ for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet())
{
+ final Set<String> updatedTopicsForStateStore = new HashSet<>();
+ for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) {
+ for (Pattern pattern : storePattern.getValue()) {
+ if (pattern.matcher(subscriptionUpdateTopic).matches()) {
+ updatedTopicsForStateStore.add(subscriptionUpdateTopic);
+ }
+ }
+ }
+ if (!updatedTopicsForStateStore.isEmpty()) {
+ Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey());
+ if (storeTopics != null) {
+ updatedTopicsForStateStore.addAll(storeTopics);
+ }
+ stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+ }
+ }
+ }
+ }
+
private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?>
supplier, final String name) {
if (!(supplier instanceof WindowStoreSupplier)) {
return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
supplier.logConfig());
@@ -1224,7 +1266,7 @@ public class TopologyBuilder {
return latestPattern;
}
- private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns,
Set<String> otherTopics) {
+ private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern>
otherPatterns, final Set<String> otherTopics) {
for (Pattern otherPattern : otherPatterns) {
if (builtPattern.pattern().contains(otherPattern.pattern())) {
@@ -1247,8 +1289,8 @@ public class TopologyBuilder {
* @param sourcePatterns Patterns for matching source topics to add to a composite pattern
* @return a Pattern that is composed of the literal source topic names and any Patterns
for matching source topics
*/
- private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String>
sourceTopics, Collection<Pattern> sourcePatterns) {
- StringBuilder builder = new StringBuilder();
+ private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String>
sourceTopics, final Collection<Pattern> sourcePatterns) {
+ final StringBuilder builder = new StringBuilder();
for (String topic : sourceTopics) {
builder.append(topic).append("|");
@@ -1284,7 +1326,7 @@ public class TopologyBuilder {
* @return groups of topic names
*/
public synchronized Collection<Set<String>> copartitionGroups() {
- List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
+ final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
for (Set<String> nodeNames : copartitionSourceGroups) {
Set<String> copartitionGroup = new HashSet<>();
for (String node : nodeNames) {
@@ -1309,7 +1351,7 @@ public class TopologyBuilder {
return decoratedTopics;
}
- private String decorateTopic(String topic) {
+ private String decorateTopic(final String topic) {
if (applicationId == null) {
throw new TopologyBuilderException("there are internal topics and "
+ "applicationId hasn't been set. Call "
@@ -1321,7 +1363,7 @@ public class TopologyBuilder {
public synchronized Pattern sourceTopicPattern() {
if (this.topicPattern == null) {
- List<String> allSourceTopics = new ArrayList<>();
+ final List<String> allSourceTopics = new ArrayList<>();
if (!nodeToSourceTopics.isEmpty()) {
for (List<String> topics : nodeToSourceTopics.values()) {
allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
@@ -1335,9 +1377,10 @@ public class TopologyBuilder {
return this.topicPattern;
}
- public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates,
String threadId) {
+ public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String threadId) {
log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching
regex subscription(s)", threadId, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
setRegexMatchedTopicsToSourceNodes();
+ setRegexMatchedTopicToStateStore();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8c7361/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 213fffe..0ea36ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -32,12 +32,15 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -53,11 +56,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -227,6 +232,31 @@ public class RegexSourceIntegrationTest {
streams.close();
}
+ @Test
+ public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
+
+ ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore",
false);
+
+ TopologyBuilder builder = new TopologyBuilder()
+ .addSource("ingest", Pattern.compile("topic-\\d+"))
+ .addProcessor("my-processor", processorSupplier, "ingest")
+ .addStateStore(stateStoreSupplier, "my-processor");
+
+
+ final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ streams.start();
+
+ final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class);
+
+ IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for
test"), producerConfig, mockTime);
+ streams.close();
+
+ Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics();
+
+ assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1"));
+ }
+
@Test
public void testShouldReadFromRegexAndNamedTopics() throws Exception {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8c7361/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 2f3a450..a4d94a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -50,6 +50,7 @@ import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class TopologyBuilderTest {
@@ -610,4 +611,37 @@ public class TopologyBuilderTest {
assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
+
+ final TopologyBuilder topologyBuilder = new TopologyBuilder()
+ .addSource("ingest", Pattern.compile("topic-\\d+"))
+ .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
+ .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
+
+ final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+ final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+ updatedTopicsField.setAccessible(true);
+
+ final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+ updatedTopics.add("topic-2");
+ updatedTopics.add("topic-3");
+ updatedTopics.add("topic-A");
+
+ topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
+ topologyBuilder.setApplicationId("test-app");
+
+ Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
+ List<String> topics = stateStoreAndTopics.get("testStateStore");
+
+ assertTrue("Expected to contain two topics", topics.size() == 2);
+
+ assertTrue(topics.contains("topic-2"));
+ assertTrue(topics.contains("topic-3"));
+ assertFalse(topics.contains("topic-A"));
+ }
+
}
|