From commits-return-10792-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Nov 29 02:15:42 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5490418B82 for ; Thu, 29 Nov 2018 02:15:42 +0000 (UTC) Received: (qmail 36825 invoked by uid 500); 29 Nov 2018 02:15:42 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 36786 invoked by uid 500); 29 Nov 2018 02:15:42 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 36777 invoked by uid 99); 29 Nov 2018 02:15:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Nov 2018 02:15:41 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2DB2082E48; Thu, 29 Nov 2018 02:15:41 +0000 (UTC) Date: Thu, 29 Nov 2018 02:15:40 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154345773953.19586.473499023560200868@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: ec668180d797c4d08ea899de61c344438621ced7 X-Git-Newrev: 2c305dc64c033ffcfd6e45548752f0702f373032 X-Git-Rev: 2c305dc64c033ffcfd6e45548752f0702f373032 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2c305dc KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959) 2c305dc is described below commit 2c305dc64c033ffcfd6e45548752f0702f373032 Author: Bill Bejeck AuthorDate: Wed Nov 28 21:15:26 2018 -0500 KAFKA-7671: Stream-Global Table join should not reset repartition flag (#5959) This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results. I've added a test which fails without the fix. Reviewers: John Roesler , Matthias J. Sax , Guozhang Wang --- .../streams/kstream/internals/KStreamImpl.java | 2 +- .../streams/kstream/internals/KStreamImplTest.java | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 3b69151..ed5625e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -753,7 +753,7 @@ public class KStreamImpl extends AbstractStream implements KStream(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder); + return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, streamTableJoinNode, builder); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 772836f..e0c38c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -61,15 +61,18 @@ import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; import java.util.regex.Pattern; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings("unchecked") @@ -437,6 +440,27 @@ public class KStreamImplTest { } } } + + @Test + public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + final GlobalKTable globalKTable = builder.globalTable("globalTopic"); + final KeyValueMapper kvMappper = (k, v) -> k + v; + final ValueJoiner valueJoiner = (v1, v2) -> v1 + v2; + builder.stream("topic").selectKey((k, v) -> v) + .join(globalKTable, kvMappper, valueJoiner) + .groupByKey() + .count(); + + final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + final String topology = builder.build().describe().toString(); + final Matcher matcher = repartitionTopicPattern.matcher(topology); + assertTrue(matcher.find()); + final String match = matcher.group(); + assertThat(match, notNullValue()); + assertTrue(match.endsWith("repartition")); + + } @Test public void testToWithNullValueSerdeDoesntNPE() {