kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8298: Fix possible concurrent modification exception (#6643)
Date Wed, 01 May 2019 16:36:46 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 977257f  KAFKA-8298: Fix possible concurrent modification exception (#6643)
977257f is described below

commit 977257f544aebcb59fadf4e424d32cecd9191875
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed May 1 12:07:45 2019 -0400

    KAFKA-8298: Fix possible concurrent modification exception (#6643)
    
    When processing multiple key-changing operations during the optimization phase a ConcurrentModificationException
is possible.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  8 ++++--
 .../kstream/internals/graph/StreamsGraphTest.java  | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e0983eb..bf36389 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -42,9 +42,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Properties;
@@ -299,8 +301,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
+        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>>
entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
 
-        for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>
entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+        while (entryIterator.hasNext()) {
+            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>
entry = entryIterator.next();
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -356,7 +360,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
 
             keyChangingNode.addChild(optimizedSingleRepartition);
-            keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+            entryIterator.remove();
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index bd43685..0fecaa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -68,6 +74,33 @@ public class StreamsGraphTest {
     }
 
     @Test
+    public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> inputStream = builder.stream("inputTopic");
+
+        final KStream<String, String> changedKeyStream = inputStream.selectKey((k,
v) -> v.substring(0, 5));
+
+        // first repartition
+        changedKeyStream.groupByKey(Grouped.as("count-repartition"))
+            .count(Materialized.as("count-store"))
+            .toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+        // second repartition
+        changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
+            .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+            .count(Materialized.as("windowed-count-store"))
+            .toStream()
+            .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(),
Serdes.Long()));
+
+        builder.build(properties);
+    }
+
+    @Test
     public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
         final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);


Mime
View raw message