kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4269: Update topic subscription when regex pattern specified out of topicGroups method
Date Thu, 20 Oct 2016 04:04:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 17cb4fe52 -> 4c295a784


KAFKA-4269: Update topic subscription when regex pattern specified out of topicGroups method

…d out of topicGroups method. The topicGroups method only called from StreamPartitionAssignor
when KafkaStreams object  is the leader, needs to be executed for clients.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #2005 from bbejeck/KAFKA-4269_multiple_kstream_instances_mult_consumers_npe


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c295a78
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c295a78
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c295a78

Branch: refs/heads/trunk
Commit: 4c295a78446be6eba24ca4a9b7e506657e55c875
Parents: 17cb4fe
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Wed Oct 19 21:04:28 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 19 21:04:28 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 27 +++++++++++------
 .../internals/StreamPartitionAssignor.java      | 10 +++++--
 .../streams/processor/TopologyBuilderTest.java  | 31 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4c295a78/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index f5fd571..81f1f63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -101,6 +103,8 @@ public class TopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
+
     private static class StateStoreFactory {
         public final Set<String> users;
 
@@ -831,14 +835,6 @@ public class TopologyBuilder {
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
         Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
-                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
-                //need to update nodeToSourceTopics with topics matched from given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
-            }
-        }
-
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
@@ -897,6 +893,17 @@ public class TopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    private void setRegexMatchedTopicsToSourceNodes() {
+        if (subscriptionUpdates.hasUpdates()) {
+            for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
+                SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                //need to update nodeToSourceTopics with topics matched from given regex
+                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+                log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
+            }
+        }
+    }
+
     private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier,
final String name) {
         if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
             return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
supplier.logConfig());
@@ -999,7 +1006,9 @@ public class TopologyBuilder {
         return this.topicPattern;
     }
 
-    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates)
{
+    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates,
String threadId) {
+        log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching
regex subscription(s)", threadId, subscriptionUpdates);
         this.subscriptionUpdates = subscriptionUpdates;
+        setRegexMatchedTopicsToSourceNodes();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c295a78/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 3be9c11..dcba543 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -178,10 +178,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-            log.debug("have {} topics matching regex", topics);
+            log.debug("stream-thread [{}] found {} topics possibly matching regex", streamThread.getName(),
topics);
             // update the topic groups with the returned subscription set for regex pattern
subscriptions
             subscriptionUpdates.updateTopics(topics);
-            streamThread.builder.updateSubscriptions(subscriptionUpdates);
+            streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
         }
 
         return new Subscription(new ArrayList<>(topics), data.encode());
@@ -669,6 +669,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             return !updatedTopicSubscriptions.isEmpty();
         }
 
+        @Override
+        public String toString() {
+            return "SubscriptionUpdates{" +
+                    "updatedTopicSubscriptions=" + updatedTopicSubscriptions +
+                    '}';
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c295a78/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 3f45967..d260937 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,12 +27,14 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.Test;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -521,6 +523,7 @@ public class TopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
+
     @Test(expected = TopologyBuilderException.class)
     public void shouldThroughOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
@@ -583,4 +586,32 @@ public class TopologyBuilderTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source-1", "topic-foo");
+        builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
+        builder.addSource("source-3", Pattern.compile("topic-\\d"));
+
+        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+
+        Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+
+        updatedTopics.add("topic-B");
+        updatedTopics.add("topic-3");
+        updatedTopics.add("topic-A");
+
+        builder.updateSubscriptions(subscriptionUpdates, null);
+        builder.setApplicationId("test-id");
+
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
+        assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
+        assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
+
+    }
 }


Mime
View raw message