kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117)
Date Mon, 16 Dec 2019 17:39:25 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new ed12616  KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117)
ed12616 is described below

commit ed12616c58970920fcb4917045070a0ad163b9e2
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Dec 12 16:24:01 2019 -0500

    KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117)
    
    Fixes case where multiple children merged from a key-changing node causes an NPE.
    
    Reviewers:  Matthias J. Sax <mjsax@apache.org>, Boyang Chen <boyang@confluent.io>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  9 +++-
 .../kstream/internals/graph/StreamsGraphTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index ca43c56..9509431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -42,6 +42,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -392,6 +393,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     private void maybeUpdateKeyChangingRepartitionNodeMap() {
         final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers
= new HashMap<>();
+        final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
             final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
@@ -409,11 +411,14 @@ public class InternalStreamsBuilder implements InternalNameProvider
{
             final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new
LinkedHashSet<>();
             for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                 repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
-                keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
+                mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
             }
-
             keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes);
         }
+
+        for (final StreamsGraphNode mergeNodeKeyChangingParent : mergeNodeKeyChangingParentsToRemove)
{
+            keyChangingOperationsToOptimizableRepartitionNodes.remove(mergeNodeKeyChangingParent);
+        }
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 0fecaa2..e2006e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -124,6 +125,28 @@ public class StreamsGraphTest {
 
     }
 
+    @Test
+    public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<Integer, Integer> parentStream = streamsBuilder.stream("input_topic",
Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .selectKey(Integer::sum);
+
+        final KStream<Integer, Integer> childStream1 = parentStream.mapValues(v ->
v + 1);
+        final KStream<Integer, Integer> childStream2 = parentStream.mapValues(v ->
v + 2);
+        final KStream<Integer, Integer> childStream3 = parentStream.mapValues(v ->
v + 3);
+
+        childStream1
+            .merge(childStream2)
+            .merge(childStream3)
+            .to("output_topic");
+
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final Topology topology = streamsBuilder.build(properties);
+
+        assertEquals(expectedMergeOptimizedTopology, topology.describe().toString());
+    }
+
     private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig)
{
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -242,4 +265,30 @@ public class StreamsGraphTest {
                                           + "    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n"
                                           + "      <-- KSTREAM-MAPVALUES-0000000008\n\n";
 
+
+    private String expectedMergeOptimizedTopology = "Topologies:\n" +
+        "   Sub-topology: 0\n" +
+        "    Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n" +
+        "      --> KSTREAM-KEY-SELECT-0000000001\n" +
+        "    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+        "      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n"
+
+        "      <-- KSTREAM-SOURCE-0000000000\n" +
+        "    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+        "      --> KSTREAM-MERGE-0000000005\n" +
+        "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+        "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+        "      --> KSTREAM-MERGE-0000000005\n" +
+        "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+        "    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n" +
+        "      --> KSTREAM-MERGE-0000000006\n" +
+        "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+        "    Processor: KSTREAM-MERGE-0000000005 (stores: [])\n" +
+        "      --> KSTREAM-MERGE-0000000006\n" +
+        "      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n" +
+        "    Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+        "      --> KSTREAM-SINK-0000000007\n" +
+        "      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n" +
+        "    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
+        "      <-- KSTREAM-MERGE-0000000006\n\n";
+
 }


Mime
View raw message