kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)
Date Thu, 29 Nov 2018 02:16:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a80d0bf  KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)
a80d0bf is described below

commit a80d0bf6a65ec88d935e5764b4881c6a25a36cec
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed Nov 28 21:15:26 2018 -0500

    KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959)
    
    This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable
we should not reset the repartition flag as the stream may have previously changed its key,
and the resulting stream could be used in an aggregation operation or join with another stream
which may require a repartition for correct results.
    
    I've added a test which fails without the fix.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/internals/KStreamImpl.java     |  2 +-
 .../streams/kstream/internals/KStreamImplTest.java | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index f1a8754..26ea63c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -754,7 +754,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
         // do not have serde for joined result
-        return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode,
builder);
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired,
streamTableJoinNode, builder);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d033e49..34abbb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -61,14 +61,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
@@ -436,6 +439,27 @@ public class KStreamImplTest {
             }
         }
     }
+
+    @Test
+    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final GlobalKTable<String, String> globalKTable = builder.globalTable("globalTopic");
+        final KeyValueMapper<String, String, String> kvMappper = (k, v) -> k + v;
+        final ValueJoiner<String, String, String> valueJoiner = (v1, v2) -> v1 +
v2;
+        builder.<String, String>stream("topic").selectKey((k, v) -> v)
+            .join(globalKTable, kvMappper, valueJoiner)
+            .groupByKey()
+            .count();
+
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+        final String topology = builder.build().describe().toString();
+        final Matcher matcher = repartitionTopicPattern.matcher(topology);
+        assertTrue(matcher.find());
+        final String match = matcher.group();
+        assertThat(match, notNullValue());
+        assertTrue(match.endsWith("repartition"));
+
+    }
     
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {


Mime
View raw message