kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-9739: 2.3 null child node fix (#8419)
Date Sat, 04 Apr 2020 15:46:28 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 477d43a  KAFKA-9739: 2.3 null child node fix (#8419)
477d43a is described below

commit 477d43a6be865bf5120970370568a37439dc8e33
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Sat Apr 4 11:45:55 2020 -0400

    KAFKA-9739: 2.3 null child node fix (#8419)
    
    A port of #8400 for 2.3. The process of sorting source and sink nodes changed in 2.4,
so we can't cherry-pick the PR directly as we need to update the expected topology to what
it would be in the 2.3 version.
    
    Reviewers: John Roesler <john@confluent.io>, Andrew Choi <a24choi@edu.uwaterloo.ca>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  17 +-
 .../kstream/internals/graph/StreamsGraphTest.java  | 186 +++++++++++++++++++++
 2 files changed, 198 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e48a3fc..ab9d230 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -382,11 +382,13 @@ public class InternalStreamsBuilder implements InternalNameProvider
{
         final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
-            final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
-            for (final StreamsGraphNode key : keys) {
-                final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode,
node -> node.parentNodes().contains(key));
-                if (maybeParentKey != null) {
-                    mergeNodesToKeyChangers.get(mergeNode).add(key);
+            final Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>>
entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
+            for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>
entry : entrySet) {
+                if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) {
+                    final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode,
node -> node.parentNodes().contains(entry.getKey()));
+                    if (maybeParentKey != null) {
+                        mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey());
+                    }
                 }
             }
         }
@@ -407,6 +409,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
+    private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode mergeNode,
+                                                    final LinkedHashSet<OptimizableRepartitionNode>
repartitionNodes) {
+        return repartitionNodes.stream().allMatch(n -> findParentNodeMatching(n, gn ->
gn.parentNodes().contains(mergeNode)) != null);
+    }
+
     @SuppressWarnings("unchecked")
     private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
                                                              final Serde keySerde,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e2006e6..84578ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,14 +22,22 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -47,6 +55,8 @@ import static org.junit.Assert.assertEquals;
 public class StreamsGraphTest {
 
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+    private Initializer<String> initializer;
+    private Aggregator<String, String, String> aggregator;
 
     // Test builds topology in succesive manner but only graph node not yet processed written
to topology
 
@@ -102,6 +112,76 @@ public class StreamsGraphTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotThrowNPEWithMergeNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        initializer = () -> "";
+        aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
+        final TransformerSupplier<String, String, KeyValue<String, String>> transformSupplier
= () -> new Transformer<String, String, KeyValue<String, String>>() {
+            @Override
+            public void init(final ProcessorContext context) {
+
+            }
+
+            @Override
+            public KeyValue<String, String> transform(final String key, final String
value) {
+                return KeyValue.pair(key, value);
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+
+        final KStream<String, String> retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(),
Serdes.String()))
+                .transform(transformSupplier)
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), Suppressed.BufferConfig.maxBytes(64_000_000)))
+                .toStream()
+                .flatMap((k, v) -> new ArrayList<>());
+
+        final KTable<String, String> idTable = builder.stream("id-table-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+                .flatMap((k, v) -> new ArrayList<KeyValue<String, String>>())
+                .peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId
+ " " + recipientId))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> joinStream = builder.stream("internal-topic-command",
Consumed.with(Serdes.String(), Serdes.String()))
+                .peek((subscriptionId, command) -> System.out.println("stdoutput"))
+                .mapValues((k, v) -> v)
+                .merge(retryStream)
+                .leftJoin(idTable, (v1, v2) -> v1 + v2,
+                        Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
+
+        final KStream<String, String>[] branches = joinStream.branch((k, v) -> v.equals("some-value"),
(k, v) -> true);
+
+        branches[0].map(KeyValue::pair)
+                .peek((recipientId, command) -> System.out.println("printing out"))
+                .to("external-command", Produced.with(Serdes.String(), Serdes.String()));
+
+        branches[1].filter((k, v) -> v != null)
+                .peek((subscriptionId, wrapper) -> System.out.println("Printing output"))
+                .mapValues((k, v) -> v)
+                .to("dlq-topic", Produced.with(Serdes.String(), Serdes.String()));
+
+        branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));
+
+        final Topology topology = builder.build(properties);
+        assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString());
+    }
+
+    @Test
     public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
         final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
@@ -291,4 +371,110 @@ public class StreamsGraphTest {
         "    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
         "      <-- KSTREAM-MERGE-0000000006\n\n";
 
+
+    private final String expectedComplexMergeOptimizeTopology = "Topologies:\n"
+        + "   Sub-topology: 0\n"
+        + "    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n"
+        + "      --> KSTREAM-TRANSFORM-0000000001\n"
+        + "    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n"
+        + "      --> KSTREAM-FILTER-0000000040\n"
+        + "      <-- KSTREAM-SOURCE-0000000000\n"
+        + "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000039\n"
+        + "      <-- KSTREAM-TRANSFORM-0000000001\n"
+        + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n"
+        + "      <-- KSTREAM-FILTER-0000000040\n"
+        + "\n"
+        + "  Sub-topology: 1\n"
+        + "    Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n"
+        + "      --> KSTREAM-FLATMAP-0000000012\n"
+        + "    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n"
+        + "      --> KSTREAM-FILTER-0000000043\n"
+        + "      <-- KSTREAM-SOURCE-0000000011\n"
+        + "    Processor: KSTREAM-FILTER-0000000043 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000042\n"
+        + "      <-- KSTREAM-FLATMAP-0000000012\n"
+        + "    Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n"
+        + "      <-- KSTREAM-FILTER-0000000043\n"
+        + "\n"
+        + "  Sub-topology: 2\n"
+        + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n"
+        + "      --> KSTREAM-AGGREGATE-0000000003\n"
+        + "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n"
+        + "      --> KTABLE-SUPPRESS-0000000007\n"
+        + "      <-- KSTREAM-SOURCE-0000000041\n"
+        + "    Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n"
+        + "      --> KSTREAM-PEEK-0000000020\n"
+        + "    Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n"
+        + "      --> KTABLE-TOSTREAM-0000000009\n"
+        + "      <-- KSTREAM-AGGREGATE-0000000003\n"
+        + "    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n"
+        + "      --> KSTREAM-MAPVALUES-0000000021\n"
+        + "      <-- KSTREAM-SOURCE-0000000019\n"
+        + "    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n"
+        + "      --> KSTREAM-FLATMAP-0000000010\n"
+        + "      <-- KTABLE-SUPPRESS-0000000007\n"
+        + "    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n"
+        + "      --> KSTREAM-MERGE-0000000022\n"
+        + "      <-- KTABLE-TOSTREAM-0000000009\n"
+        + "    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n"
+        + "      --> KSTREAM-MERGE-0000000022\n"
+        + "      <-- KSTREAM-PEEK-0000000020\n"
+        + "    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n"
+        + "      --> KSTREAM-FILTER-0000000024\n"
+        + "      <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n"
+        + "    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000023\n"
+        + "      <-- KSTREAM-MERGE-0000000022\n"
+        + "    Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n"
+        + "      <-- KSTREAM-FILTER-0000000024\n"
+        + "\n"
+        + "  Sub-topology: 3\n"
+        + "    Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n"
+        + "      --> KSTREAM-LEFTJOIN-0000000026\n"
+        + "    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+        + "      --> KSTREAM-BRANCH-0000000027\n"
+        + "      <-- KSTREAM-SOURCE-0000000025\n"
+        + "    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n"
+        + "      --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n"
+        + "      <-- KSTREAM-LEFTJOIN-0000000026\n"
+        + "    Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n"
+        + "      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n"
+        + "      <-- KSTREAM-BRANCH-0000000027\n"
+        + "    Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n"
+        + "      --> KSTREAM-MAP-0000000030\n"
+        + "      <-- KSTREAM-BRANCH-0000000027\n"
+        + "    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n"
+        + "      --> KSTREAM-PEEK-0000000034\n"
+        + "      <-- KSTREAM-BRANCHCHILD-0000000029\n"
+        + "    Processor: KSTREAM-MAP-0000000030 (stores: [])\n"
+        + "      --> KSTREAM-PEEK-0000000031\n"
+        + "      <-- KSTREAM-BRANCHCHILD-0000000028\n"
+        + "    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n"
+        + "      --> KSTREAM-MAPVALUES-0000000035\n"
+        + "      <-- KSTREAM-FILTER-0000000033\n"
+        + "    Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n"
+        + "      --> KSTREAM-PEEK-0000000013\n"
+        + "    Processor: KSTREAM-MAP-0000000037 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000038\n"
+        + "      <-- KSTREAM-BRANCHCHILD-0000000029\n"
+        + "    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000036\n"
+        + "      <-- KSTREAM-PEEK-0000000034\n"
+        + "    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n"
+        + "      --> KSTREAM-AGGREGATE-0000000015\n"
+        + "      <-- KSTREAM-SOURCE-0000000044\n"
+        + "    Processor: KSTREAM-PEEK-0000000031 (stores: [])\n"
+        + "      --> KSTREAM-SINK-0000000032\n"
+        + "      <-- KSTREAM-MAP-0000000030\n"
+        + "    Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+        + "      --> none\n"
+        + "      <-- KSTREAM-PEEK-0000000013\n"
+        + "    Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n"
+        + "      <-- KSTREAM-PEEK-0000000031\n"
+        + "    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n"
+        + "      <-- KSTREAM-MAPVALUES-0000000035\n"
+        + "    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n"
+        + "      <-- KSTREAM-MAP-0000000037\n\n";
+
 }


Mime
View raw message