From commits-return-13270-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Mon Dec 16 17:39:29 2019 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 0A2A819464 for ; Mon, 16 Dec 2019 17:39:28 +0000 (UTC) Received: (qmail 10153 invoked by uid 500); 16 Dec 2019 17:39:28 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 10116 invoked by uid 500); 16 Dec 2019 17:39:28 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 10107 invoked by uid 99); 16 Dec 2019 17:39:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Dec 2019 17:39:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1E0E38D80D; Mon, 16 Dec 2019 17:39:28 +0000 (UTC) Date: Mon, 16 Dec 2019 17:39:25 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.4 updated: KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157651796287.20178.5474791803538770934@gitbox.apache.org> From: bbejeck@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.4 X-Git-Reftype: branch X-Git-Oldrev: 82d06d4553ad2f6f8447e3432ca2dc42efead930 X-Git-Newrev: ed12616c58970920fcb4917045070a0ad163b9e2 X-Git-Rev: ed12616c58970920fcb4917045070a0ad163b9e2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new ed12616 KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117) ed12616 is described below commit ed12616c58970920fcb4917045070a0ad163b9e2 Author: Bill Bejeck AuthorDate: Thu Dec 12 16:24:01 2019 -0500 KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117) Fixes case where multiple children merged from a key-changing node causes an NPE. Reviewers: Matthias J. Sax , Boyang Chen --- .../kstream/internals/InternalStreamsBuilder.java | 9 +++- .../kstream/internals/graph/StreamsGraphTest.java | 49 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index ca43c56..9509431 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -392,6 +393,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private void maybeUpdateKeyChangingRepartitionNodeMap() { final Map> mergeNodesToKeyChangers = new HashMap<>(); + final Set mergeNodeKeyChangingParentsToRemove = new HashSet<>(); for (final StreamsGraphNode mergeNode : mergeNodes) { mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); final Collection keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet(); @@ -409,11 +411,14 @@ public class InternalStreamsBuilder implements InternalNameProvider { final LinkedHashSet repartitionNodes = new LinkedHashSet<>(); for (final StreamsGraphNode keyChangingParent : keyChangingParents) { repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent)); - keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent); + mergeNodeKeyChangingParentsToRemove.add(keyChangingParent); } - keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes); } + + for (final StreamsGraphNode mergeNodeKeyChangingParent : mergeNodeKeyChangingParentsToRemove) { + keyChangingOperationsToOptimizableRepartitionNodes.remove(mergeNodeKeyChangingParent); + } } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 0fecaa2..e2006e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -124,6 +125,28 @@ public class StreamsGraphTest { } + @Test + public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() { + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + final KStream parentStream = streamsBuilder.stream("input_topic", Consumed.with(Serdes.Integer(), Serdes.Integer())) + .selectKey(Integer::sum); + + final KStream childStream1 = parentStream.mapValues(v -> v + 1); + final KStream childStream2 = parentStream.mapValues(v -> v + 2); + final KStream childStream3 = parentStream.mapValues(v -> v + 3); + + childStream1 + .merge(childStream2) + .merge(childStream3) + .to("output_topic"); + + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + final Topology topology = streamsBuilder.build(properties); + + assertEquals(expectedMergeOptimizedTopology, topology.describe().toString()); + } + private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) { final StreamsBuilder builder = new StreamsBuilder(); @@ -242,4 +265,30 @@ public class StreamsGraphTest { + " Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n" + " <-- KSTREAM-MAPVALUES-0000000008\n\n"; + + private String expectedMergeOptimizedTopology = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000005\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000005\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000006\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MERGE-0000000005 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000006\n" + + " <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003\n" + + " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000007\n" + + " <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n" + + " Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" + + " <-- KSTREAM-MERGE-0000000006\n\n"; + }