kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations enabled (#6050)
Date Sat, 22 Dec 2018 06:56:41 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4616c0a  MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations
enabled (#6050)
4616c0a is described below

commit 4616c0aaff5766b6305baeed521efdfaae0094e8
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Sat Dec 22 01:56:32 2018 -0500

    MINOR: Add test demonstrating re-use of KGroupedStream with Optimizations enabled (#6050)
    
    Right now if a repartition is required and users choose to name the repartition topic
for an aggregation i.e. kGroupedStream = builder.<String, String>stream("topic").selectKey((k,
v) -> k).groupByKey(Grouped.as("grouping")); The resulting KGroupedStream can't be reused
    with optimizations are disabled, as Streams will attempt to create two repartiton topics
with the same name.
    
    However, if optimizations are enabled then the resulting KGroupedStream can be re-used
    For example the following will work if optimizations are enabled.
    
    This PR provides a unit test proving as much.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/RepartitionTopicNamingTest.java      | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 929bc3f..fe75191 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -53,6 +53,7 @@ public class RepartitionTopicNamingTest {
     private final String secondRepartitionTopicName = "aggregate-stream";
     private final String thirdRepartitionTopicName = "reduced-stream";
     private final String fourthRepartitionTopicName = "joined-stream";
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
 
     @Test
@@ -60,7 +61,6 @@ public class RepartitionTopicNamingTest {
 
         final String optimizedTopology = buildTopology(StreamsConfig.OPTIMIZE).describe().toString();
         final String unOptimizedTopology = buildTopology(StreamsConfig.NO_OPTIMIZATION).describe().toString();
-        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
         assertThat(optimizedTopology, is(EXPECTED_OPTIMIZED_TOPOLOGY));
         // only one repartition topic
@@ -95,7 +95,7 @@ public class RepartitionTopicNamingTest {
     }
 
     // each KGroupedStream will result in repartition, can't reuse
-    // KGroupedStreams when specifying repartition topic names
+    // KGroupedStreams when specifying repartition topic names and Optimization is turned
off
     // need to have separate groupByKey calls when naming repartition topics
     // see test shouldHandleUniqueGroupedInstances below for an example
     @Test
@@ -112,6 +112,18 @@ public class RepartitionTopicNamingTest {
         }
     }
 
+    @Test
+    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn()
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k,
v) -> k).groupByKey(Grouped.as("grouping"));
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final Topology topology = builder.build(properties);
+        assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern),
is(1));
+    }
+
 
     // can't use same repartition topic name in joins
     @Test


Mime
View raw message