kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9020: Streams sub-topologies should be sorted by sink -> source relationship (#7495)
Date Mon, 14 Oct 2019 20:27:22 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 68d6d99  KAFKA-9020: Streams sub-topologies should be sorted by sink -> source relationship (#7495)
68d6d99 is described below

commit 68d6d99f62541ea77855e153258b5158afdf2dee
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Oct 14 13:15:03 2019 -0700

    KAFKA-9020: Streams sub-topologies should be sorted by sink -> source relationship (#7495)
    
    Subtopologies are currently ordered alphabetically by source node, which prior to KIP-307 happened to always result in the "correct" (ie topological) order. Now that users may name their nodes anything they want, we must explicitly order them so that upstream node groups/subtopologies come first and the downstream ones come after.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
---
 .../internals/InternalTopologyBuilder.java         |  22 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |   8 +-
 .../kstream/RepartitionTopicNamingTest.java        |   2 +-
 .../internals/RepartitionOptimizingTest.java       | 347 +++++++++++----------
 .../RepartitionWithMergeOptimizingTest.java        | 145 +++++----
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 6 files changed, 259 insertions(+), 269 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d34ff00..5754b4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -85,7 +84,7 @@ public class InternalTopologyBuilder {
     // map from source processor names to regex subscription patterns
     private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
 
-    // map from sink processor names to subscribed topic (without application-id prefix for internal topics)
+    // map from sink processor names to sink topic (without application-id prefix for internal topics)
     private final Map<String, String> nodeToSinkTopic = new HashMap<>();
 
     // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node
@@ -749,27 +748,18 @@ public class InternalTopologyBuilder {
         return nodeGroups;
     }
 
+    // Order node groups by their position in the actual topology, ie upstream subtopologies come before downstream
     private Map<Integer, Set<String>> makeNodeGroups() {
         final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
         final Map<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
 
-        // Go through source nodes first. This makes the group id assignment easy to predict in tests
-        final Set<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
-        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
-
-        for (final String nodeName : Utils.sorted(allSourceNodes)) {
+        // Traverse in topological order
+        for (final String nodeName : nodeFactories.keySet()) {
             nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
         }
 
-        // Go through non-source nodes
-        for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToSourceTopics.containsKey(nodeName)) {
-                nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
-            }
-        }
-
         return nodeGroups;
     }
 
@@ -1905,11 +1895,11 @@ public class InternalTopologyBuilder {
 
     // following functions are for test only
 
-    public synchronized Set<String> getSourceTopicNames() {
+    public synchronized Set<String> sourceTopicNames() {
         return sourceTopicNames;
     }
 
-    public synchronized Map<String, StateStoreFactory> getStateStores() {
+    public synchronized Map<String, StateStoreFactory> stateStores() {
         return stateFactories;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4118297..0cce24f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -385,10 +385,10 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.build().storeToChangelogTopic(),
             equalTo(Collections.singletonMap("store", "topic")));
         assertThat(
-            internalTopologyBuilder.getStateStores().keySet(),
+            internalTopologyBuilder.stateStores().keySet(),
             equalTo(Collections.singleton("store")));
         assertThat(
-            internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(false));
         assertThat(
             internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
@@ -407,10 +407,10 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.build().storeToChangelogTopic(),
             equalTo(Collections.singletonMap("store", "appId-store-changelog")));
         assertThat(
-            internalTopologyBuilder.getStateStores().keySet(),
+            internalTopologyBuilder.stateStores().keySet(),
             equalTo(Collections.singleton("store")));
         assertThat(
-            internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
+            internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(true));
         assertThat(
             internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 0f3e33d..cf8d6b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -603,7 +603,7 @@ public class RepartitionTopicNamingTest {
             "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
             "      --> KSTREAM-MAP-0000000001\n" +
             "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
-            "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" +
+            "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" +
             "      <-- KSTREAM-SOURCE-0000000000\n" +
             "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
             "      --> KSTREAM-PEEK-0000000021\n" +
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index 8425ad7..cf46af6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -279,182 +279,183 @@ public class RepartitionOptimizingTest {
     }
 
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                              + "   Sub-topology: 0\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n"
-                                                              + "      --> aggregate, count, join-filter, reduce-filter\n"
-                                                              + "    Processor: count (stores: [count-store])\n"
-                                                              + "      --> count-toStream\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
-                                                              + "    Processor: count-toStream (stores: [])\n"
-                                                              + "      --> join-other-windowed, count-to\n"
-                                                              + "      <-- count\n"
-                                                              + "    Processor: join-filter (stores: [])\n"
-                                                              + "      --> join-this-windowed\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
-                                                              + "    Processor: reduce-filter (stores: [])\n"
-                                                              + "      --> reduce-peek\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
-                                                              + "    Processor: join-other-windowed (stores: [other-join-store])\n"
-                                                              + "      --> join-other-join\n"
-                                                              + "      <-- count-toStream\n"
-                                                              + "    Processor: join-this-windowed (stores: [join-store])\n"
-                                                              + "      --> join-this-join\n"
-                                                              + "      <-- join-filter\n"
-                                                              + "    Processor: reduce-peek (stores: [])\n"
-                                                              + "      --> reducer\n"
-                                                              + "      <-- reduce-filter\n"
-                                                              + "    Processor: aggregate (stores: [aggregate-store])\n"
-                                                              + "      --> aggregate-toStream\n"
-                                                              + "      <-- KSTREAM-SOURCE-0000000036\n"
-                                                              + "    Processor: join-other-join (stores: [join-store])\n"
-                                                              + "      --> join-merge\n"
-                                                              + "      <-- join-other-windowed\n"
-                                                              + "    Processor: join-this-join (stores: [other-join-store])\n"
-                                                              + "      --> join-merge\n"
-                                                              + "      <-- join-this-windowed\n"
-                                                              + "    Processor: reducer (stores: [reduce-store])\n"
-                                                              + "      --> reduce-toStream\n"
-                                                              + "      <-- reduce-peek\n"
-                                                              + "    Processor: aggregate-toStream (stores: [])\n"
-                                                              + "      --> reduce-to\n"
-                                                              + "      <-- aggregate\n"
-                                                              + "    Processor: join-merge (stores: [])\n"
-                                                              + "      --> join-to\n"
-                                                              + "      <-- join-this-join, join-other-join\n"
-                                                              + "    Processor: reduce-toStream (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000023\n"
-                                                              + "      <-- reducer\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
-                                                              + "      <-- reduce-toStream\n"
-                                                              + "    Sink: count-to (topic: outputTopic_0)\n"
-                                                              + "      <-- count-toStream\n"
-                                                              + "    Sink: join-to (topic: joinedOutputTopic)\n"
-                                                              + "      <-- join-merge\n"
-                                                              + "    Sink: reduce-to (topic: outputTopic_1)\n"
-                                                              + "      <-- aggregate-toStream\n"
-                                                              + "\n"
-                                                              + "  Sub-topology: 1\n"
-                                                              + "    Source: sourceStream (topics: [input])\n"
-                                                              + "      --> source-map\n"
-                                                              + "    Processor: source-map (stores: [])\n"
-                                                              + "      --> process-filter, KSTREAM-FILTER-0000000035\n"
-                                                              + "      <-- sourceStream\n"
-                                                              + "    Processor: process-filter (stores: [])\n"
-                                                              + "      --> process-mapValues\n"
-                                                              + "      <-- source-map\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000035 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000034\n"
-                                                              + "      <-- source-map\n"
-                                                              + "    Processor: process-mapValues (stores: [])\n"
-                                                              + "      --> process\n"
-                                                              + "      <-- process-filter\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000035\n"
-                                                              + "    Processor: process (stores: [])\n"
-                                                              + "      --> none\n"
-                                                              + "      <-- process-mapValues\n\n";
+                                                                  + "   Sub-topology: 0\n"
+                                                                  + "    Source: sourceStream (topics: [input])\n"
+                                                                  + "      --> source-map\n"
+                                                                  + "    Processor: source-map (stores: [])\n"
+                                                                  + "      --> process-filter, KSTREAM-FILTER-0000000035\n"
+                                                                  + "      <-- sourceStream\n"
+                                                                  + "    Processor: process-filter (stores: [])\n"
+                                                                  + "      --> process-mapValues\n"
+                                                                  + "      <-- source-map\n"
+                                                                  + "    Processor: KSTREAM-FILTER-0000000035 (stores: [])\n"
+                                                                  + "      --> KSTREAM-SINK-0000000034\n"
+                                                                  + "      <-- source-map\n"
+                                                                  + "    Processor: process-mapValues (stores: [])\n"
+                                                                  + "      --> process\n"
+                                                                  + "      <-- process-filter\n"
+                                                                  + "    Sink: KSTREAM-SINK-0000000034 (topic: count-groupByKey-repartition)\n"
+                                                                  + "      <-- KSTREAM-FILTER-0000000035\n"
+                                                                  + "    Processor: process (stores: [])\n"
+                                                                  + "      --> none\n"
+                                                                  + "      <-- process-mapValues\n"
+                                                                  + "\n"
+                                                                  + "  Sub-topology: 1\n"
+                                                                  + "    Source: KSTREAM-SOURCE-0000000036 (topics: [count-groupByKey-repartition])\n"
+                                                                  + "      --> aggregate, count, join-filter, reduce-filter\n"
+                                                                  + "    Processor: count (stores: [count-store])\n"
+                                                                  + "      --> count-toStream\n"
+                                                                  + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    Processor: count-toStream (stores: [])\n"
+                                                                  + "      --> join-other-windowed, count-to\n"
+                                                                  + "      <-- count\n"
+                                                                  + "    Processor: join-filter (stores: [])\n"
+                                                                  + "      --> join-this-windowed\n"
+                                                                  + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    Processor: reduce-filter (stores: [])\n"
+                                                                  + "      --> reduce-peek\n"
+                                                                  + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                                  + "      --> join-other-join\n"
+                                                                  + "      <-- count-toStream\n"
+                                                                  + "    Processor: join-this-windowed (stores: [join-store])\n"
+                                                                  + "      --> join-this-join\n"
+                                                                  + "      <-- join-filter\n"
+                                                                  + "    Processor: reduce-peek (stores: [])\n"
+                                                                  + "      --> reducer\n"
+                                                                  + "      <-- reduce-filter\n"
+                                                                  + "    Processor: aggregate (stores: [aggregate-store])\n"
+                                                                  + "      --> aggregate-toStream\n"
+                                                                  + "      <-- KSTREAM-SOURCE-0000000036\n"
+                                                                  + "    Processor: join-other-join (stores: [join-store])\n"
+                                                                  + "      --> join-merge\n"
+                                                                  + "      <-- join-other-windowed\n"
+                                                                  + "    Processor: join-this-join (stores: [other-join-store])\n"
+                                                                  + "      --> join-merge\n"
+                                                                  + "      <-- join-this-windowed\n"
+                                                                  + "    Processor: reducer (stores: [reduce-store])\n"
+                                                                  + "      --> reduce-toStream\n"
+                                                                  + "      <-- reduce-peek\n"
+                                                                  + "    Processor: aggregate-toStream (stores: [])\n"
+                                                                  + "      --> reduce-to\n"
+                                                                  + "      <-- aggregate\n"
+                                                                  + "    Processor: join-merge (stores: [])\n"
+                                                                  + "      --> join-to\n"
+                                                                  + "      <-- join-this-join, join-other-join\n"
+                                                                  + "    Processor: reduce-toStream (stores: [])\n"
+                                                                  + "      --> KSTREAM-SINK-0000000023\n"
+                                                                  + "      <-- reducer\n"
+                                                                  + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                                  + "      <-- reduce-toStream\n"
+                                                                  + "    Sink: count-to (topic: outputTopic_0)\n"
+                                                                  + "      <-- count-toStream\n"
+                                                                  + "    Sink: join-to (topic: joinedOutputTopic)\n"
+                                                                  + "      <-- join-merge\n"
+                                                                  + "    Sink: reduce-to (topic: outputTopic_1)\n"
+                                                                  + "      <-- aggregate-toStream\n\n";
+
 
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                                + "   Sub-topology: 0\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n"
-                                                                + "      --> count\n"
-                                                                + "    Processor: count (stores: [count-store])\n"
-                                                                + "      --> count-toStream\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000007\n"
-                                                                + "    Processor: count-toStream (stores: [])\n"
-                                                                + "      --> join-other-windowed, count-to\n"
-                                                                + "      <-- count\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n"
-                                                                + "      --> join-this-windowed\n"
-                                                                + "    Processor: join-other-windowed (stores: [other-join-store])\n"
-                                                                + "      --> join-other-join\n"
-                                                                + "      <-- count-toStream\n"
-                                                                + "    Processor: join-this-windowed (stores: [join-store])\n"
-                                                                + "      --> join-this-join\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000027\n"
-                                                                + "    Processor: join-other-join (stores: [join-store])\n"
-                                                                + "      --> join-merge\n"
-                                                                + "      <-- join-other-windowed\n"
-                                                                + "    Processor: join-this-join (stores: [other-join-store])\n"
-                                                                + "      --> join-merge\n"
-                                                                + "      <-- join-this-windowed\n"
-                                                                + "    Processor: join-merge (stores: [])\n"
-                                                                + "      --> join-to\n"
-                                                                + "      <-- join-this-join, join-other-join\n"
-                                                                + "    Sink: count-to (topic: outputTopic_0)\n"
-                                                                + "      <-- count-toStream\n"
-                                                                + "    Sink: join-to (topic: joinedOutputTopic)\n"
-                                                                + "      <-- join-merge\n"
-                                                                + "\n"
-                                                                + "  Sub-topology: 1\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n"
-                                                                + "      --> aggregate\n"
-                                                                + "    Processor: aggregate (stores: [aggregate-store])\n"
-                                                                + "      --> aggregate-toStream\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000013\n"
-                                                                + "    Processor: aggregate-toStream (stores: [])\n"
-                                                                + "      --> reduce-to\n"
-                                                                + "      <-- aggregate\n"
-                                                                + "    Sink: reduce-to (topic: outputTopic_1)\n"
-                                                                + "      <-- aggregate-toStream\n"
-                                                                + "\n"
-                                                                + "  Sub-topology: 2\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n"
-                                                                + "      --> reducer\n"
-                                                                + "    Processor: reducer (stores: [reduce-store])\n"
-                                                                + "      --> reduce-toStream\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000021\n"
-                                                                + "    Processor: reduce-toStream (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000023\n"
-                                                                + "      <-- reducer\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
-                                                                + "      <-- reduce-toStream\n"
-                                                                + "\n"
-                                                                + "  Sub-topology: 3\n"
-                                                                + "    Source: sourceStream (topics: [input])\n"
-                                                                + "      --> source-map\n"
-                                                                + "    Processor: source-map (stores: [])\n"
-                                                                + "      --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n"
-                                                                + "      <-- sourceStream\n"
-                                                                + "    Processor: reduce-filter (stores: [])\n"
-                                                                + "      --> reduce-peek\n"
-                                                                + "      <-- source-map\n"
-                                                                + "    Processor: join-filter (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000026\n"
-                                                                + "      <-- source-map\n"
-                                                                + "    Processor: process-filter (stores: [])\n"
-                                                                + "      --> process-mapValues\n"
-                                                                + "      <-- source-map\n"
-                                                                + "    Processor: reduce-peek (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000020\n"
-                                                                + "      <-- reduce-filter\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000006 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000005\n"
-                                                                + "      <-- source-map\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000012 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000011\n"
-                                                                + "      <-- source-map\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000019\n"
-                                                                + "      <-- reduce-peek\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000026 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000025\n"
-                                                                + "      <-- join-filter\n"
-                                                                + "    Processor: process-mapValues (stores: [])\n"
-                                                                + "      --> process\n"
-                                                                + "      <-- process-filter\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000006\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000012\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000020\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000026\n"
-                                                                + "    Processor: process (stores: [])\n"
-                                                                + "      --> none\n"
-                                                                + "      <-- process-mapValues\n\n";
+                                                                    + "   Sub-topology: 0\n"
+                                                                    + "    Source: sourceStream (topics: [input])\n"
+                                                                    + "      --> source-map\n"
+                                                                    + "    Processor: source-map (stores: [])\n"
+                                                                    + "      --> reduce-filter, process-filter, KSTREAM-FILTER-0000000006, join-filter, KSTREAM-FILTER-0000000012\n"
+                                                                    + "      <-- sourceStream\n"
+                                                                    + "    Processor: reduce-filter (stores: [])\n"
+                                                                    + "      --> reduce-peek\n"
+                                                                    + "      <-- source-map\n"
+                                                                    + "    Processor: join-filter (stores: [])\n"
+                                                                    + "      --> KSTREAM-FILTER-0000000026\n"
+                                                                    + "      <-- source-map\n"
+                                                                    + "    Processor: process-filter (stores: [])\n"
+                                                                    + "      --> process-mapValues\n"
+                                                                    + "      <-- source-map\n"
+                                                                    + "    Processor: reduce-peek (stores: [])\n"
+                                                                    + "      --> KSTREAM-FILTER-0000000020\n"
+                                                                    + "      <-- reduce-filter\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000006 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000005\n"
+                                                                    + "      <-- source-map\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000012 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000011\n"
+                                                                    + "      <-- source-map\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000019\n"
+                                                                    + "      <-- reduce-peek\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000026 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000025\n"
+                                                                    + "      <-- join-filter\n"
+                                                                    + "    Processor: process-mapValues (stores: [])\n"
+                                                                    + "      --> process\n"
+                                                                    + "      <-- process-filter\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000005 (topic: count-groupByKey-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000006\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000011 (topic: aggregate-groupByKey-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000012\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000019 (topic: reduce-groupByKey-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000020\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000025 (topic: join-left-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000026\n"
+                                                                    + "    Processor: process (stores: [])\n"
+                                                                    + "      --> none\n"
+                                                                    + "      <-- process-mapValues\n"
+                                                                    + "\n"
+                                                                    + "  Sub-topology: 1\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000007 (topics: [count-groupByKey-repartition])\n"
+                                                                    + "      --> count\n"
+                                                                    + "    Processor: count (stores: [count-store])\n"
+                                                                    + "      --> count-toStream\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000007\n"
+                                                                    + "    Processor: count-toStream (stores: [])\n"
+                                                                    + "      --> join-other-windowed, count-to\n"
+                                                                    + "      <-- count\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000027 (topics: [join-left-repartition])\n"
+                                                                    + "      --> join-this-windowed\n"
+                                                                    + "    Processor: join-other-windowed (stores: [other-join-store])\n"
+                                                                    + "      --> join-other-join\n"
+                                                                    + "      <-- count-toStream\n"
+                                                                    + "    Processor: join-this-windowed (stores: [join-store])\n"
+                                                                    + "      --> join-this-join\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000027\n"
+                                                                    + "    Processor: join-other-join (stores: [join-store])\n"
+                                                                    + "      --> join-merge\n"
+                                                                    + "      <-- join-other-windowed\n"
+                                                                    + "    Processor: join-this-join (stores: [other-join-store])\n"
+                                                                    + "      --> join-merge\n"
+                                                                    + "      <-- join-this-windowed\n"
+                                                                    + "    Processor: join-merge (stores: [])\n"
+                                                                    + "      --> join-to\n"
+                                                                    + "      <-- join-this-join, join-other-join\n"
+                                                                    + "    Sink: count-to (topic: outputTopic_0)\n"
+                                                                    + "      <-- count-toStream\n"
+                                                                    + "    Sink: join-to (topic: joinedOutputTopic)\n"
+                                                                    + "      <-- join-merge\n"
+                                                                    + "\n"
+                                                                    + "  Sub-topology: 2\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000013 (topics: [aggregate-groupByKey-repartition])\n"
+                                                                    + "      --> aggregate\n"
+                                                                    + "    Processor: aggregate (stores: [aggregate-store])\n"
+                                                                    + "      --> aggregate-toStream\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000013\n"
+                                                                    + "    Processor: aggregate-toStream (stores: [])\n"
+                                                                    + "      --> reduce-to\n"
+                                                                    + "      <-- aggregate\n"
+                                                                    + "    Sink: reduce-to (topic: outputTopic_1)\n"
+                                                                    + "      <-- aggregate-toStream\n"
+                                                                    + "\n"
+                                                                    + "  Sub-topology: 3\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000021 (topics: [reduce-groupByKey-repartition])\n"
+                                                                    + "      --> reducer\n"
+                                                                    + "    Processor: reducer (stores: [reduce-store])\n"
+                                                                    + "      --> reduce-toStream\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000021\n"
+                                                                    + "    Processor: reduce-toStream (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000023\n"
+                                                                    + "      <-- reducer\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n"
+                                                                    + "      <-- reduce-toStream\n\n";
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
index 0d081f7..7e12abe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java
@@ -211,6 +211,26 @@ public class RepartitionWithMergeOptimizingTest {
 
     private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
                                                               + "   Sub-topology: 0\n"
+                                                              + "    Source: sourceAStream (topics: [inputA])\n"
+                                                              + "      --> mappedAStream\n"
+                                                              + "    Source: sourceBStream (topics: [inputB])\n"
+                                                              + "      --> mappedBStream\n"
+                                                              + "    Processor: mappedAStream (stores: [])\n"
+                                                              + "      --> mergedStream\n"
+                                                              + "      <-- sourceAStream\n"
+                                                              + "    Processor: mappedBStream (stores: [])\n"
+                                                              + "      --> mergedStream\n"
+                                                              + "      <-- sourceBStream\n"
+                                                              + "    Processor: mergedStream (stores: [])\n"
+                                                              + "      --> KSTREAM-FILTER-0000000019\n"
+                                                              + "      <-- mappedAStream, mappedBStream\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000019 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000018\n"
+                                                              + "      <-- mergedStream\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000019\n"
+                                                              + "\n"
+                                                              + "  Sub-topology: 1\n"
                                                               + "    Source: KSTREAM-SOURCE-0000000020 (topics: [long-groupByKey-repartition])\n"
                                                               + "      --> long-count, string-count\n"
                                                               + "    Processor: string-count (stores: [string-store])\n"
@@ -231,81 +251,60 @@ public class RepartitionWithMergeOptimizingTest {
                                                               + "    Sink: long-to (topic: outputTopic_0)\n"
                                                               + "      <-- long-toStream\n"
                                                               + "    Sink: string-to (topic: outputTopic_1)\n"
-                                                              + "      <-- string-mapValues\n"
-                                                              + "\n"
-                                                              + "  Sub-topology: 1\n"
-                                                              + "    Source: sourceAStream (topics: [inputA])\n"
-                                                              + "      --> mappedAStream\n"
-                                                              + "    Source: sourceBStream (topics: [inputB])\n"
-                                                              + "      --> mappedBStream\n"
-                                                              + "    Processor: mappedAStream (stores: [])\n"
-                                                              + "      --> mergedStream\n"
-                                                              + "      <-- sourceAStream\n"
-                                                              + "    Processor: mappedBStream (stores: [])\n"
-                                                              + "      --> mergedStream\n"
-                                                              + "      <-- sourceBStream\n"
-                                                              + "    Processor: mergedStream (stores: [])\n"
-                                                              + "      --> KSTREAM-FILTER-0000000019\n"
-                                                              + "      <-- mappedAStream, mappedBStream\n"
-                                                              + "    Processor: KSTREAM-FILTER-0000000019 (stores: [])\n"
-                                                              + "      --> KSTREAM-SINK-0000000018\n"
-                                                              + "      <-- mergedStream\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000018 (topic: long-groupByKey-repartition)\n"
-                                                              + "      <-- KSTREAM-FILTER-0000000019\n\n";
+                                                              + "      <-- string-mapValues\n\n";
 
 
     private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
-                                                                + "   Sub-topology: 0\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n"
-                                                                + "      --> long-count\n"
-                                                                + "    Processor: long-count (stores: [long-store])\n"
-                                                                + "      --> long-toStream\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000008\n"
-                                                                + "    Processor: long-toStream (stores: [])\n"
-                                                                + "      --> long-to\n"
-                                                                + "      <-- long-count\n"
-                                                                + "    Sink: long-to (topic: outputTopic_0)\n"
-                                                                + "      <-- long-toStream\n"
-                                                                + "\n"
-                                                                + "  Sub-topology: 1\n"
-                                                                + "    Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n"
-                                                                + "      --> string-count\n"
-                                                                + "    Processor: string-count (stores: [string-store])\n"
-                                                                + "      --> string-toStream\n"
-                                                                + "      <-- KSTREAM-SOURCE-0000000014\n"
-                                                                + "    Processor: string-toStream (stores: [])\n"
-                                                                + "      --> string-mapValues\n"
-                                                                + "      <-- string-count\n"
-                                                                + "    Processor: string-mapValues (stores: [])\n"
-                                                                + "      --> string-to\n"
-                                                                + "      <-- string-toStream\n"
-                                                                + "    Sink: string-to (topic: outputTopic_1)\n"
-                                                                + "      <-- string-mapValues\n"
-                                                                + "\n"
-                                                                + "  Sub-topology: 2\n"
-                                                                + "    Source: sourceAStream (topics: [inputA])\n"
-                                                                + "      --> mappedAStream\n"
-                                                                + "    Source: sourceBStream (topics: [inputB])\n"
-                                                                + "      --> mappedBStream\n"
-                                                                + "    Processor: mappedAStream (stores: [])\n"
-                                                                + "      --> mergedStream\n"
-                                                                + "      <-- sourceAStream\n"
-                                                                + "    Processor: mappedBStream (stores: [])\n"
-                                                                + "      --> mergedStream\n"
-                                                                + "      <-- sourceBStream\n"
-                                                                + "    Processor: mergedStream (stores: [])\n"
-                                                                + "      --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n"
-                                                                + "      <-- mappedAStream, mappedBStream\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000006\n"
-                                                                + "      <-- mergedStream\n"
-                                                                + "    Processor: KSTREAM-FILTER-0000000013 (stores: [])\n"
-                                                                + "      --> KSTREAM-SINK-0000000012\n"
-                                                                + "      <-- mergedStream\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000007\n"
-                                                                + "    Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n"
-                                                                + "      <-- KSTREAM-FILTER-0000000013\n\n";
-
+                                                                    + "   Sub-topology: 0\n"
+                                                                    + "    Source: sourceAStream (topics: [inputA])\n"
+                                                                    + "      --> mappedAStream\n"
+                                                                    + "    Source: sourceBStream (topics: [inputB])\n"
+                                                                    + "      --> mappedBStream\n"
+                                                                    + "    Processor: mappedAStream (stores: [])\n"
+                                                                    + "      --> mergedStream\n"
+                                                                    + "      <-- sourceAStream\n"
+                                                                    + "    Processor: mappedBStream (stores: [])\n"
+                                                                    + "      --> mergedStream\n"
+                                                                    + "      <-- sourceBStream\n"
+                                                                    + "    Processor: mergedStream (stores: [])\n"
+                                                                    + "      --> KSTREAM-FILTER-0000000007, KSTREAM-FILTER-0000000013\n"
+                                                                    + "      <-- mappedAStream, mappedBStream\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000007 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000006\n"
+                                                                    + "      <-- mergedStream\n"
+                                                                    + "    Processor: KSTREAM-FILTER-0000000013 (stores: [])\n"
+                                                                    + "      --> KSTREAM-SINK-0000000012\n"
+                                                                    + "      <-- mergedStream\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000006 (topic: long-groupByKey-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000007\n"
+                                                                    + "    Sink: KSTREAM-SINK-0000000012 (topic: string-groupByKey-repartition)\n"
+                                                                    + "      <-- KSTREAM-FILTER-0000000013\n"
+                                                                    + "\n"
+                                                                    + "  Sub-topology: 1\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000008 (topics: [long-groupByKey-repartition])\n"
+                                                                    + "      --> long-count\n"
+                                                                    + "    Processor: long-count (stores: [long-store])\n"
+                                                                    + "      --> long-toStream\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000008\n"
+                                                                    + "    Processor: long-toStream (stores: [])\n"
+                                                                    + "      --> long-to\n"
+                                                                    + "      <-- long-count\n"
+                                                                    + "    Sink: long-to (topic: outputTopic_0)\n"
+                                                                    + "      <-- long-toStream\n"
+                                                                    + "\n"
+                                                                    + "  Sub-topology: 2\n"
+                                                                    + "    Source: KSTREAM-SOURCE-0000000014 (topics: [string-groupByKey-repartition])\n"
+                                                                    + "      --> string-count\n"
+                                                                    + "    Processor: string-count (stores: [string-store])\n"
+                                                                    + "      --> string-toStream\n"
+                                                                    + "      <-- KSTREAM-SOURCE-0000000014\n"
+                                                                    + "    Processor: string-toStream (stores: [])\n"
+                                                                    + "      --> string-mapValues\n"
+                                                                    + "      <-- string-count\n"
+                                                                    + "    Processor: string-mapValues (stores: [])\n"
+                                                                    + "      --> string-to\n"
+                                                                    + "      <-- string-toStream\n"
+                                                                    + "    Sink: string-to (topic: outputTopic_1)\n"
+                                                                    + "      <-- string-mapValues\n\n";
 
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 43324ee..da1ba77 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -447,7 +447,7 @@ public class TopologyTestDriver implements Closeable {
     private void pipeRecord(final String topic, final Long timestamp, final byte[] key, final byte[] value, final Headers headers) {
         final String topicName = topic;
 
-        if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) {
+        if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) {
             validateSourceTopicNameRegexPattern(topic);
         }
         final TopicPartition topicPartition = getTopicPartition(topicName);
@@ -495,7 +495,7 @@ public class TopologyTestDriver implements Closeable {
 
 
     private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) {
-        for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) {
+        for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) {
             if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) {
                 throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName +
                         " cannot contain regex pattern for input record topic: " + inputRecordTopic +


Mime
View raw message