kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7406: Name join group repartition topics (#5709)
Date Tue, 02 Oct 2018 06:09:42 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dff1a37  KAFKA-7406: Name join group repartition topics (#5709)
dff1a37 is described below

commit dff1a3799c1c25b1b0009be6835f2c9c78cb5900
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Oct 2 02:09:12 2018 -0400

    KAFKA-7406: Name join group repartition topics (#5709)
    
    Reviewer: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/kstream/Grouped.java  | 157 ++++++
 .../org/apache/kafka/streams/kstream/Joined.java   |  92 +++-
 .../org/apache/kafka/streams/kstream/KStream.java  | 145 +++--
 .../org/apache/kafka/streams/kstream/KTable.java   |  48 +-
 .../apache/kafka/streams/kstream/Serialized.java   |   3 +
 ...erializedInternal.java => GroupedInternal.java} |  14 +-
 .../internals/GroupedStreamAggregateBuilder.java   |  16 +-
 .../kstream/internals/InternalStreamsBuilder.java  |  45 +-
 .../kstream/internals/KGroupedStreamImpl.java      |  11 +-
 .../kstream/internals/KGroupedTableImpl.java       |  15 +-
 .../streams/kstream/internals/KStreamImpl.java     |  78 ++-
 .../streams/kstream/internals/KTableImpl.java      |  30 +-
 .../kstream/internals/SerializedInternal.java      |   1 +
 .../kstream/internals/TimeWindowedKStreamImpl.java |   4 +-
 .../graph/OptimizableRepartitionNode.java          |   4 +
 .../RepartitionOptimizingIntegrationTest.java      |   7 +-
 ...artitionWithMergeOptimizingIntegrationTest.java |   7 +-
 .../kstream/RepartitionTopicNamingTest.java        | 610 +++++++++++++++++++++
 18 files changed, 1140 insertions(+), 147 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
new file mode 100644
index 0000000..404cbd4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * The class that is used to capture the key and value {@link Serde}s and set the part of name used for
+ * repartition topics when performing {@link KStream#groupBy(KeyValueMapper, Grouped)}, {@link
+ * KStream#groupByKey(Grouped)}, or {@link KTable#groupBy(KeyValueMapper, Grouped)} operations.  Note
+ * that Kafka Streams does not always create repartition topic for grouping operations.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class Grouped<K, V> {
+
+    protected final  Serde<K> keySerde;
+    protected final Serde<V> valueSerde;
+    protected final String name;
+
+
+    private Grouped(final String name,
+                    final Serde<K> keySerde,
+                    final Serde<V> valueSerde) {
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    protected Grouped(final Grouped<K, V> grouped) {
+        this(grouped.name, grouped.keySerde, grouped.valueSerde);
+    }
+
+    /**
+     * Create a {@code Grouped} instance with the provided name used for a repartition topic required for
+     * performing the grouping operation.
+     *
+     * @param name the name used for a repartition topic if required
+     * @return a new {@link Grouped} configured with the name
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> as(final String name) {
+        return new Grouped<>(name, null, null);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided keySerde.
+     *
+     * @param keySerde the Serde used for serializing the key
+     * @return a new {@link Grouped} configured with the keySerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K> Grouped keySerde(final Serde<K> keySerde) {
+        return new Grouped<>(null, keySerde, null);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided valueSerde.
+     *
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <V> Grouped valueSerde(final Serde<V> valueSerde) {
+        return new Grouped<>(null, null, valueSerde);
+    }
+
+    /**
+     * Create a {@code Grouped} instance with the provided {@code name}, {@code keySerde}, and {@code valueSerde}.
+     *
+     * @param name       the name used for part of the repartition topic name if required
+     * @param keySerde   the Serde used for serializing the key
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the name, keySerde, and valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> with(final String name,
+                                            final Serde<K> keySerde,
+                                            final Serde<V> valueSerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+
+    /**
+     * Create a {@code Grouped} instance with the provided {@code keySerde} and {@code valueSerde}.
+     *
+     * @param keySerde   the Serde used for serializing the key
+     * @param valueSerde the Serde used for serializing the value
+     * @return a new {@link Grouped} configured with the keySerde, and valueSerde
+     * @see KStream#groupByKey(Grouped)
+     * @see KStream#groupBy(KeyValueMapper, Grouped)
+     * @see KTable#groupBy(KeyValueMapper, Grouped)
+     */
+    public static <K, V> Grouped<K, V> with(final Serde<K> keySerde,
+                                            final Serde<V> valueSerde) {
+        return new Grouped<>(null, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation with the name for a repartition topic if required.  Note
+     * that Kafka Streams does not always create a repartition topic for grouping operations.
+     *
+     * @param name the name used for part of the repartition topic if required
+     * @return a new @{Grouped} instance configured with the {@code name}
+     * */
+    public Grouped<K, V> withName(final String name) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation using the provided keySerde for serializing the key.
+     *
+     * @param keySerde Serde to use for serializing the key
+     * @return a new {@code Grouped} instance configured with the {@code keySerde}
+     */
+    public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+    /**
+     * Perform the grouping operation using the provided valueSerde for serializing the value.
+     *
+     * @param valueSerde Serde to use for serializing the value
+     * @return a new {@code Grouped} instance configured with the {@code valueSerde}
+     */
+    public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {
+        return new Grouped<>(name, keySerde, valueSerde);
+    }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index 8601e1c..aa29c68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -24,16 +24,20 @@ import org.apache.kafka.common.serialization.Serde;
  */
 public class Joined<K, V, VO> {
 
-    private Serde<K> keySerde;
-    private Serde<V> valueSerde;
-    private Serde<VO> otherValueSerde;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final Serde<VO> otherValueSerde;
+    private final String name;
+
 
     private Joined(final Serde<K> keySerde,
                    final Serde<V> valueSerde,
-                   final Serde<VO> otherValueSerde) {
+                   final Serde<VO> otherValueSerde,
+                   final String name) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.otherValueSerde = otherValueSerde;
+        this.name = name;
     }
 
     /**
@@ -51,7 +55,32 @@ public class Joined<K, V, VO> {
     public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
                                                    final Serde<V> valueSerde,
                                                    final Serde<VO> otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
+     * {@code null} values are accepted and will be replaced by the default serdes as defined in
+     * config.
+     *
+     * @param keySerde the key serde to use. If {@code null} the default key serde from config will be
+     * used
+     * @param valueSerde the value serde to use. If {@code null} the default value serde from config
+     * will be used
+     * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde
+     * from config will be used
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance with the provided serdes
+     */
+    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
+                                                   final Serde<V> valueSerde,
+                                                   final Serde<VO> otherValueSerde,
+                                                   final String name) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -65,7 +94,7 @@ public class Joined<K, V, VO> {
      * @return new {@code Joined} instance configured with the keySerde
      */
     public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) {
-        return with(keySerde, null, null);
+        return new Joined<>(keySerde, null, null, null);
     }
 
     /**
@@ -79,7 +108,7 @@ public class Joined<K, V, VO> {
      * @return new {@code Joined} instance configured with the valueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) {
-        return with(null, valueSerde, null);
+        return new Joined<>(null, valueSerde, null, null);
     }
 
     /**
@@ -93,19 +122,34 @@ public class Joined<K, V, VO> {
      * @return new {@code Joined} instance configured with the otherValueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherValueSerde) {
-        return with(null, null, otherValueSerde);
+        return new Joined<>(null, null, otherValueSerde, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with base name for all components of the join, this may
+     * include any repartition topics created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance configured with the name
+     */
+    public static <K, V, VO> Joined<K, V, VO> named(final String name) {
+        return new Joined<>(null, null, null, name);
     }
 
+
     /**
      * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
      * key serde as defined in config
      *
      * @param keySerde the key serde to use. If null the default key serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code name}
      */
     public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
-        this.keySerde = keySerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -113,11 +157,10 @@ public class Joined<K, V, VO> {
      * value serde as defined in config
      *
      * @param valueSerde the value serde to use. If null the default value serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code valueSerde}
      */
     public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
-        this.valueSerde = valueSerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     /**
@@ -125,11 +168,22 @@ public class Joined<K, V, VO> {
      * value serde as defined in config
      *
      * @param otherValueSerde the otherValue serde to use. If null the default value serde from config will be used
-     * @return this
+     * @return new {@code Joined} instance configured with the {@code valueSerde}
      */
     public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
-        this.otherValueSerde = otherValueSerde;
-        return this;
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+    }
+
+    /**
+     * Set the base name used for all components of the join, this may include any repartition topics
+     * created to complete the join.
+     *
+     * @param name the name used as the base for naming components of the join including any
+     * repartition topics
+     * @return new {@code Joined} instance configured with the {@code name}
+     */
+    public Joined<K, V, VO> withName(final String name) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
     }
 
     public Serde<K> keySerde() {
@@ -143,4 +197,8 @@ public class Joined<K, V, VO> {
     public Serde<VO> otherValueSerde() {
         return otherValueSerde;
     }
+
+    public String name() {
+        return name;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index cf2ce75..13f1dfc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -770,11 +770,12 @@ public interface KStream<K, V> {
      * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
      * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
-     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later
+     * operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -782,7 +783,7 @@ public interface KStream<K, V> {
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
      * correctly on its key.
      * If the last key changing operator changed the key type, it is recommended to use
-     * {@link #groupByKey(Serialized)} instead.
+     * {@link #groupByKey(org.apache.kafka.streams.kstream.Grouped)} instead.
      *
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupBy(KeyValueMapper)
@@ -799,11 +800,12 @@ public interface KStream<K, V> {
      * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
      * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
      * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
-     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka
+     * if a later operator depends on the newly selected key..
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -813,22 +815,58 @@ public interface KStream<K, V> {
      *
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
      * @see #groupBy(KeyValueMapper)
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KStream#groupByKey(Grouped)} instead
      */
+    @Deprecated
     KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);
 
     /**
+     * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
+     * and using the serializers as defined by {@link Grouped}.
+     * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+     * <p>
+     * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
+     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
+     * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
+     * {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later operator
+     * depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, &lt;name&gt; is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name,
+     * and "-repartition" is a fixed suffix.
+     *
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
+     * correctly on its key.
+     *
+     * @param  grouped  the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                  and part of the name for a repartition topic if repartitioning is required.
+     * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     * @see #groupBy(KeyValueMapper)
+     */
+    KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);
+
+    /**
      * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
      * and default serializers and deserializers.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which should be of the same type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}
      * <p>
-     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
+     * later operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -849,14 +887,15 @@ public interface KStream<K, V> {
      * and {@link Serde}s as specified by {@link Serialized}.
      * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedStream}).
-     * The {@link KeyValueMapper} selects a new key (which should be of the same type) while preserving the original values.
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
      * <p>
-     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a
+     * later operator depends on the newly selected key.
+     * This topic will be as "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
      * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
      * an internally generated name, and "-repartition" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * <p>
@@ -868,11 +907,47 @@ public interface KStream<K, V> {
      * @param selector a {@link KeyValueMapper} that computes a new key for grouping
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KStream#groupBy(KeyValueMapper, Grouped)} instead
      */
+    @Deprecated
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                                        final Serialized<KR, V> serialized);
 
     /**
+     * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Grouped}.
+     * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the original values.
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
+     * operator depends on the newly selected key.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
+     * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
+     * <p>
+     * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
+     *
+     * @param selector a {@link KeyValueMapper} that computes a new key for grouping
+     * @param grouped  the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                 and part of the name for a repartition topic if repartitioning is required.
+     * @param <KR>     the key type of the result {@link KGroupedStream}
+     * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
+     */
+    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
+                                       final Grouped<KR, V> grouped);
+
+
+
+    /**
      * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
      * serializers and deserializers.
      * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
@@ -932,7 +1007,7 @@ public interface KStream<K, V> {
      * in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
      * internally generated name, and "-changelog" is a fixed suffix.
-     *
+     * <p>
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
      *
      * @param otherStream the {@code KStream} to be joined with this stream
@@ -1076,9 +1151,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1157,9 +1232,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1241,9 +1316,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1323,9 +1398,9 @@ public interface KStream<K, V> {
      * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      * <p>
      * Repartitioning can happen for one or both of the joining {@code KStream}s.
@@ -1408,9 +1483,9 @@ public interface KStream<K, V> {
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1484,9 +1559,9 @@ public interface KStream<K, V> {
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1566,9 +1641,9 @@ public interface KStream<K, V> {
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -1645,9 +1720,9 @@ public interface KStream<K, V> {
      * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
      * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
      * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
-     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated name, and
      * "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 293bc6b..f9989e83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,8 +23,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -562,8 +562,8 @@ public interface KTable<K, V> {
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -590,12 +590,12 @@ public interface KTable<K, V> {
      * provided {@link KeyValueMapper}.
      * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
      * (cf. {@link KGroupedTable}).
-     * The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type).
+     * The {@link KeyValueMapper} selects a new key and value (with both maybe being the same type or a new type).
      * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
      * <p>
      * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
-     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
-     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
      * You can retrieve all generated internal topic names via {@link Topology#describe()}.
@@ -610,11 +610,47 @@ public interface KTable<K, V> {
      * @param <KR>          the key type of the result {@link KGroupedTable}
      * @param <VR>          the value type of the result {@link KGroupedTable}
      * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
+     *
+     * @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.KTable#groupBy(KeyValueMapper, Grouped)} instead
      */
+    @Deprecated
     <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                            final Serialized<KR, VR> serialized);
 
     /**
+     * Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Grouped}.
+     * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new {@link KeyValue} pair by applying the
+     * provided {@link KeyValueMapper}.
+     * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedTable}).
+     * The {@link KeyValueMapper} selects a new key and value (where both could the same type or a new type).
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+     * This topic will be named "${applicationId}-&lt;name&gt-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},  "&lt;name&gt" is
+     * either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally generated name.
+     *
+     * <p>
+     * You can retrieve all generated internal topic names via {@link Topology#describe()}.
+     *
+     * <p>
+     * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
+     * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
+     * on the new key.
+     *
+     * @param selector      a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
+     * @param grouped       the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}
+     *                      and the name for a repartition topic if repartitioning is required.
+     * @param <KR>          the key type of the result {@link KGroupedTable}
+     * @param <VR>          the value type of the result {@link KGroupedTable}
+     * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable}
+     */
+    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
+                                           final Grouped<KR, VR> grouped);
+
+    /**
      * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join,
      * with default serializers, deserializers, and state store.
      * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
index 0b04696..df9c9a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -24,7 +24,10 @@ import org.apache.kafka.common.serialization.Serde;
  *
  * @param <K> the key type
  * @param <V> the value type
+ *
+ *  @deprecated since 2.1. Use {@link org.apache.kafka.streams.kstream.Grouped)} instead
  */
+@Deprecated
 public class Serialized<K, V> {
 
     protected final Serde<K> keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
similarity index 81%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
copy to streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
index c6df11f..2360fc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedInternal.java
@@ -14,14 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Grouped;
+
+public class GroupedInternal<K, V> extends Grouped<K, V> {
 
-public class SerializedInternal<K, V> extends Serialized<K, V> {
-    public SerializedInternal(final Serialized<K, V> serialized) {
-        super(serialized);
+    GroupedInternal(final Grouped<K, V> grouped) {
+        super(grouped);
     }
 
     public Serde<K> keySerde() {
@@ -31,4 +33,8 @@ public class SerializedInternal<K, V> extends Serialized<K, V> {
     public Serde<V> valueSerde() {
         return valueSerde;
     }
+
+    public String name() {
+        return name;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 9791db6..3439cf5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -36,6 +36,7 @@ class GroupedStreamAggregateBuilder<K, V> {
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final boolean repartitionRequired;
+    private final String userName;
     private final Set<String> sourceNodes;
     private final String name;
     private final StreamsGraphNode streamsGraphNode;
@@ -47,20 +48,20 @@ class GroupedStreamAggregateBuilder<K, V> {
     final Initializer<V> reduceInitializer = () -> null;
 
     GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
-                                  final Serde<K> keySerde,
-                                  final Serde<V> valueSerde,
+                                  final GroupedInternal<K, V> groupedInternal,
                                   final boolean repartitionRequired,
                                   final Set<String> sourceNodes,
                                   final String name,
                                   final StreamsGraphNode streamsGraphNode) {
 
         this.builder = builder;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
+        this.keySerde = groupedInternal.keySerde();
+        this.valueSerde = groupedInternal.valueSerde();
         this.repartitionRequired = repartitionRequired;
         this.sourceNodes = sourceNodes;
         this.name = name;
         this.streamsGraphNode = streamsGraphNode;
+        this.userName = groupedInternal.name();
     }
 
     <KR, T> KTable<KR, T> build(final String functionName,
@@ -74,7 +75,7 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
 
-        final String sourceName = repartitionIfRequired(storeBuilder.name(), repartitionNodeBuilder);
+        final String sourceName = repartitionIfRequired(userName != null ? userName : storeBuilder.name(), repartitionNodeBuilder);
 
         StreamsGraphNode parentNode = streamsGraphNode;
 
@@ -105,19 +106,20 @@ class GroupedStreamAggregateBuilder<K, V> {
                                 aggregateSupplier,
                                 statefulProcessorNode,
                                 builder);
+
     }
 
     /**
      * @return the new sourceName if repartitioned. Otherwise the name of this stream
      */
-    private String repartitionIfRequired(final String queryableStoreName,
+    private String repartitionIfRequired(final String repartitionTopicNamePrefix,
                                          final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
         if (!repartitionRequired) {
             return this.name;
         }
         // if repartition required the operation
         // captured needs to be set in the graph
-        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, queryableStoreName, name, optimizableRepartitionNodeBuilder);
+        return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, repartitionTopicNamePrefix, optimizableRepartitionNodeBuilder);
 
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 8f76740..20df084 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
@@ -43,7 +43,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
@@ -59,8 +60,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
-    private final Map<StreamsGraphNode, Set<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>();
-    private final Set<StreamsGraphNode> mergeNodes = new HashSet<>();
+    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
+    private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
 
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -251,7 +252,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
 
         if (node.isKeyChangingOperation()) {
-            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new HashSet<>());
+            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>());
         } else if (node instanceof OptimizableRepartitionNode) {
             final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
             if (parentNode != null) {
@@ -305,7 +306,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
 
-        for (final Map.Entry<StreamsGraphNode, Set<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+        for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -313,11 +314,13 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                 continue;
             }
 
-            final SerializedInternal serialized = new SerializedInternal(getRepartitionSerdes(entry.getValue()));
+            final GroupedInternal groupedInternal = new GroupedInternal(getRepartitionSerdes(entry.getValue()));
 
-            final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(keyChangingNode.nodeName(),
-                                                                                      serialized.keySerde(),
-                                                                                      serialized.valueSerde());
+            final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue());
+            //passing in the name of the first repartition topic, re-used to create the optimized repartition topic
+            final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(repartitionTopicName,
+                                                                                      groupedInternal.keySerde(),
+                                                                                      groupedInternal.valueSerde());
 
             // re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
             optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
@@ -366,7 +369,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private void maybeUpdateKeyChangingRepartitionNodeMap() {
         final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers = new HashMap<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
-            mergeNodesToKeyChangers.put(mergeNode, new HashSet<>());
+            mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
             final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
             for (final StreamsGraphNode key : keys) {
                 final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
@@ -379,7 +382,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
             final StreamsGraphNode mergeKey = entry.getKey();
             final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
-            final Set<OptimizableRepartitionNode> repartitionNodes = new HashSet<>();
+            final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
             for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                 repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                 keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
@@ -390,7 +393,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     }
 
     @SuppressWarnings("unchecked")
-    private OptimizableRepartitionNode createRepartitionNode(final String name,
+    private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
                                                              final Serde keySerde,
                                                              final Serde valueSerde) {
 
@@ -398,10 +401,14 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         KStreamImpl.createRepartitionedSource(this,
                                               keySerde,
                                               valueSerde,
-                                              name + "-optimized",
-                                              name,
+                                              repartitionTopicName,
                                               repartitionNodeBuilder);
 
+        // ensures setting the repartition topic to the name of the
+        // first repartition topic to get merged
+        // this may be an auto-generated name or a user specified name
+        repartitionNodeBuilder.withRepartitionTopic(repartitionTopicName);
+
         return repartitionNodeBuilder.build();
 
     }
@@ -416,8 +423,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         return null;
     }
 
+    private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+        return repartitionNodes.iterator().next().repartitionTopic();
+    }
+
     @SuppressWarnings("unchecked")
-    private SerializedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+    private GroupedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
         Serde keySerde = null;
         Serde valueSerde = null;
 
@@ -435,7 +446,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
         }
 
-        return new SerializedInternal(Serialized.with(keySerde, valueSerde));
+        return new GroupedInternal(Grouped.with(keySerde, valueSerde));
     }
 
     private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index da2eeb6..e53e37f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -45,17 +44,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     KGroupedStreamImpl(final String name,
-                       final Serde<K> keySerde,
-                       final Serde<V> valSerde,
                        final Set<String> sourceNodes,
+                       final GroupedInternal<K, V> groupedInternal,
                        final boolean repartitionRequired,
                        final StreamsGraphNode streamsGraphNode,
                        final InternalStreamsBuilder builder) {
-        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
+        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
             builder,
-            keySerde,
-            valSerde,
+            groupedInternal,
             repartitionRequired,
             sourceNodes,
             name,
@@ -165,7 +162,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
             name,
             keySerde,
             valSerde,
-            repartitionRequired,
+            aggregateBuilder,
             streamsGraphNode
         );
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 6ec3c0d..c97576b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -48,6 +47,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
+    protected final String userSpecifiedName;
+
     private final Initializer<Long> countInitializer = () -> 0L;
 
     private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
@@ -57,10 +58,11 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
                       final Set<String> sourceNodes,
-                      final Serde<K> keySerde,
-                      final Serde<V> valSerde,
+                      final GroupedInternal<K, V> groupedInternal,
                       final StreamsGraphNode streamsGraphNode) {
-        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
+        super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(), sourceNodes, streamsGraphNode, builder);
+
+        this.userSpecifiedName = groupedInternal.name();
     }
 
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
@@ -69,9 +71,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
         final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
         final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
-        final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+        final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName())
+                + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, topic);
+        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
         // the passed in StreamsGraphNode must be the parent of the repartition node
         builder.addGraphNode(this.streamsGraphNode, repartitionNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2a3bc8f..168e210 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KGroupedStream;
@@ -565,11 +566,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(joined);
+            final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
+
+            joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), leftJoinRepartitionTopicName));
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(Joined.with(joined.keySerde(), joined.otherValueSerde(), joined.valueSerde()));
+            final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
+            final Joined newJoined = Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName);
+            joinOther = joinOther.repartitionForJoin(newJoined);
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -597,8 +602,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         final String repartitionedSourceName = createRepartitionedSource(builder,
                                                                          repartitionKeySerde,
                                                                          repartitionValueSerde,
-                                                                         null,
-                                                                         name,
+                                                                         joined.name(),
                                                                          optimizableRepartitionNodeBuilder);
 
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -610,12 +614,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
                                                      final Serde<K1> keySerde,
                                                      final Serde<V1> valSerde,
-                                                     final String topicNamePrefix,
-                                                     final String name,
+                                                     final String repartitionTopicNamePrefix,
                                                      final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
 
-        final String baseName = topicNamePrefix != null ? topicNamePrefix : name;
-        final String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
+
+        final String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
         final String sinkName = builder.newProcessorName(SINK_NAME);
         final String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
         final String sourceName = builder.newProcessorName(SOURCE_NAME);
@@ -677,7 +680,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
         } else {
             return doStreamTableJoin(other, joiner, joined, false);
@@ -697,7 +701,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined);
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
         } else {
             return doStreamTableJoin(other, joiner, joined, true);
@@ -776,47 +781,68 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
 
         // do not have serde for joined result
         return new KStreamImpl<>(name, joined.keySerde() != null ? joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, builder);
+
     }
 
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
-        return groupBy(selector, Serialized.with(null, null));
+        return groupBy(selector, Grouped.with(null, valSerde));
     }
 
     @Override
+    @Deprecated
     public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                                               final Serialized<KR, V> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
         final SerializedInternal<KR, V> serializedInternal = new SerializedInternal<>(serialized);
+
+        return groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
+                                              final Grouped<KR, V> grouped) {
+        Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(grouped, "grouped can't be null");
+        final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
         final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector);
         selectKeyMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
-        return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(),
-                                        serializedInternal.keySerde(),
-                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
-                                        sourceNodes,
-                                        true,
-                                        selectKeyMapNode,
-                                        builder);
+
+        return new KGroupedStreamImpl<>(
+            selectKeyMapNode.nodeName(),
+            sourceNodes,
+            groupedInternal,
+            true,
+            selectKeyMapNode,
+            builder);
     }
 
     @Override
     public KGroupedStream<K, V> groupByKey() {
-        return groupByKey(Serialized.with(null, null));
+        return groupByKey(Grouped.with(keySerde, valSerde));
     }
 
     @Override
+    @Deprecated
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
-        return new KGroupedStreamImpl<>(this.name,
-                                        serializedInternal.keySerde() != null ? serializedInternal.keySerde() : keySerde,
-                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
-                                        sourceNodes,
-                                        this.repartitionRequired,
-                                        streamsGraphNode,
-                                        builder);
+        return groupByKey(Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
+        final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped);
+
+        return new KGroupedStreamImpl<>(
+            name,
+            sourceNodes,
+            groupedInternal,
+            repartitionRequired,
+            streamsGraphNode,
+            builder);
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5b2970..53e7a4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -552,14 +553,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
-        return groupBy(selector, Serialized.with(null, null));
+        return this.groupBy(selector, Grouped.with(null, null));
     }
 
     @Override
+    @Deprecated
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
                                                   final Serialized<K1, V1> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(serialized, "serialized can't be null");
+        final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
+        return groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
+    }
+
+    @Override
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
+                                                  final Grouped<K1, V1> grouped) {
+        Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(grouped, "grouped can't be null");
         final String selectName = builder.newProcessorName(SELECT_NAME);
 
         final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
@@ -571,18 +582,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
         this.enableSendingOldValues();
-
-        final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
-
-        // we cannot inherit parent key and value serdes since both of them may have changed;
-        // we can only inherit from what serialized specified here
+        final GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<>(grouped);
         return new KGroupedTableImpl<>(
-            builder,
-            selectName,
-            sourceNodes,
-            serializedInternal.keySerde(),
-            serializedInternal.valueSerde(),
-            groupByMapNode
+                builder,
+                selectName,
+                sourceNodes,
+                groupedInternal,
+                groupByMapNode
         );
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
index c6df11f..0cb7050 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Serialized;
 
+@Deprecated
 public class SerializedInternal<K, V> extends Serialized<K, V> {
     public SerializedInternal(final Serialized<K, V> serialized) {
         super(serialized);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 2ee8f7c..8519671 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -52,11 +52,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                             final String name,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde,
-                            final boolean repartitionRequired,
+                            final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
                             final StreamsGraphNode streamsGraphNode) {
         super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.windows = Objects.requireNonNull(windows, "windows can't be null");
-        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
+        this.aggregateBuilder = aggregateBuilder;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index 4d81b1f..05ec6fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -54,6 +54,10 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
         return valueSerde;
     }
 
+    public String repartitionTopic() {
+        return repartitionTopic;
+    }
+
     @Override
     Serializer<V> getValueSerializer() {
         return valueSerde != null ? valueSerde.serializer() : null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index 5eebf04..cde2349 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -58,8 +59,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import kafka.utils.MockTime;
-
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -273,11 +272,11 @@ public class RepartitionOptimizingIntegrationTest {
                                                               + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
                                                               + "      --> none\n"
                                                               + "      <-- KSTREAM-MAPVALUES-0000000003\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-MAP-0000000001-optimized-repartition)\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
                                                               + "      <-- KSTREAM-FILTER-0000000040\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-MAP-0000000001-optimized-repartition])\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
                                                               + "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n"
                                                               + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
                                                               + "      --> KTABLE-TOSTREAM-0000000011\n"
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
index af1f5f1..29901a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -49,8 +50,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import kafka.utils.MockTime;
-
 import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
@@ -214,11 +213,11 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
                                                               + "    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n"
                                                               + "      --> KSTREAM-SINK-0000000020\n"
                                                               + "      <-- KSTREAM-MERGE-0000000004\n"
-                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-MERGE-0000000004-optimized-repartition)\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
                                                               + "      <-- KSTREAM-FILTER-0000000021\n"
                                                               + "\n"
                                                               + "  Sub-topology: 1\n"
-                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-MERGE-0000000004-optimized-repartition])\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
                                                               + "      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n"
                                                               + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
                                                               + "      --> KTABLE-TOSTREAM-0000000017\n"
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
new file mode 100644
index 0000000..872ae5c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RepartitionTopicNamingTest {
+
+    private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;
+    private static final String INPUT_TOPIC = "input";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String AGGREGATION_TOPIC = "outputTopic_1";
+    private static final String REDUCE_TOPIC = "outputTopic_2";
+    private static final String JOINED_TOPIC = "outputTopicForJoin";
+
+    private final String firstRepartitionTopicName = "count-stream";
+    private final String secondRepartitionTopicName = "aggregate-stream";
+    private final String thirdRepartitionTopicName = "reduced-stream";
+    private final String fourthRepartitionTopicName = "joined-stream";
+
+
+    @Test
+    public void shouldReuseFirstRepartitionTopicNameWhenOptimizing() {
+
+        final String optimizedTopology = buildTopology(StreamsConfig.OPTIMIZE).describe().toString();
+        final String unOptimizedTopology = buildTopology(StreamsConfig.NO_OPTIMIZATION).describe().toString();
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+
+        assertThat(optimizedTopology, is(EXPECTED_OPTIMIZED_TOPOLOGY));
+        // only one repartition topic
+        assertThat(1, is(getCountOfRepartitionTopicsFound(optimizedTopology, repartitionTopicPattern)));
+        // the first named repartition topic
+        assertTrue(optimizedTopology.contains(firstRepartitionTopicName + "-repartition"));
+
+
+        assertThat(unOptimizedTopology, is(EXPECTED_UNOPTIMIZED_TOPOLOGY));
+        // now 4 repartition topic
+        assertThat(4, is(getCountOfRepartitionTopicsFound(unOptimizedTopology, repartitionTopicPattern)));
+        // all 4 named repartition topics present
+        assertTrue(unOptimizedTopology.contains(firstRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(secondRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(thirdRepartitionTopicName + "-repartition"));
+        assertTrue(unOptimizedTopology.contains(fourthRepartitionTopicName + "-left-repartition"));
+
+    }
+
+    // can't use same repartition topic name
+    @Test
+    public void shouldFailWithSameRepartitionTopicName() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream();
+            builder.<String, String>stream("topicII").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")).count().toStream();
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+              // ok
+        }
+    }
+
+    // each KGroupedStream will result in repartition, can't reuse
+    // KGroupedStreams when specifying repartition topic names
+    // need to have separate groupByKey calls when naming repartition topics
+    // see test shouldHandleUniqueGroupedInstances below for an example
+    @Test
+    public void shouldFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
+            kGroupedStream.windowedBy(TimeWindows.of(10)).count();
+            kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+            // ok
+        }
+    }
+
+
+    // can't use same repartition topic name in joins
+    @Test
+    public void shouldFailWithSameRepartitionTopicNameInJoin() {
+        try {
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> stream1 = builder.<String, String>stream("topic").selectKey((k, v) -> k);
+            final KStream<String, String> stream2 = builder.<String, String>stream("topic2").selectKey((k, v) -> k);
+            final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
+
+            final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+            builder.build();
+            fail("Should not build re-using repartition topic name");
+        } catch (final TopologyException te) {
+            // ok
+        }
+    }
+
+
+    @Test
+    public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+        final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
+        kGroupedStream.windowedBy(TimeWindows.of(10)).count();
+        kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+        builder.build(properties);
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForJoins() {
+
+        final String expectedLeftRepartitionTopic = "(topic: my-join-left-repartition)";
+        final String expectedRightRepartitionTopic = "(topic: my-join-right-repartition)";
+
+
+        final String joinTopologyFirst = buildStreamJoin(false);
+
+        assertTrue(joinTopologyFirst.contains(expectedLeftRepartitionTopic));
+        assertTrue(joinTopologyFirst.contains(expectedRightRepartitionTopic));
+
+        final String joinTopologyUpdated = buildStreamJoin(true);
+
+        assertTrue(joinTopologyUpdated.contains(expectedLeftRepartitionTopic));
+        assertTrue(joinTopologyUpdated.contains(expectedRightRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeyTimeWindows() {
+
+        final String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
+
+        final String timeWindowGroupingRepartitionTopology = buildStreamGroupByKeyTimeWindows(false, true);
+        assertTrue(timeWindowGroupingRepartitionTopology.contains(expectedTimeWindowRepartitionTopic));
+
+        final String timeWindowGroupingUpdatedTopology = buildStreamGroupByKeyTimeWindows(true, true);
+        assertTrue(timeWindowGroupingUpdatedTopology.contains(expectedTimeWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByTimeWindows() {
+
+        final String expectedTimeWindowRepartitionTopic = "(topic: time-window-grouping-repartition)";
+
+        final String timeWindowGroupingRepartitionTopology = buildStreamGroupByKeyTimeWindows(false, false);
+        assertTrue(timeWindowGroupingRepartitionTopology.contains(expectedTimeWindowRepartitionTopic));
+
+        final String timeWindowGroupingUpdatedTopology = buildStreamGroupByKeyTimeWindows(true, false);
+        assertTrue(timeWindowGroupingUpdatedTopology.contains(expectedTimeWindowRepartitionTopic));
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeyNoWindows() {
+
+        final String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
+
+        final String noWindowGroupingRepartitionTopology = buildStreamGroupByKeyNoWindows(false, true);
+        assertTrue(noWindowGroupingRepartitionTopology.contains(expectedNoWindowRepartitionTopic));
+
+        final String noWindowGroupingUpdatedTopology = buildStreamGroupByKeyNoWindows(true, true);
+        assertTrue(noWindowGroupingUpdatedTopology.contains(expectedNoWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByNoWindows() {
+
+        final String expectedNoWindowRepartitionTopic = "(topic: kstream-grouping-repartition)";
+
+        final String noWindowGroupingRepartitionTopology = buildStreamGroupByKeyNoWindows(false, false);
+        assertTrue(noWindowGroupingRepartitionTopology.contains(expectedNoWindowRepartitionTopic));
+
+        final String noWindowGroupingUpdatedTopology = buildStreamGroupByKeyNoWindows(true, false);
+        assertTrue(noWindowGroupingUpdatedTopology.contains(expectedNoWindowRepartitionTopic));
+    }
+
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupByKeySessionWindows() {
+
+        final String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
+
+        final String sessionWindowGroupingRepartitionTopology = buildStreamGroupByKeySessionWindows(false, true);
+        assertTrue(sessionWindowGroupingRepartitionTopology.contains(expectedSessionWindowRepartitionTopic));
+
+        final String sessionWindowGroupingUpdatedTopology = buildStreamGroupByKeySessionWindows(true, true);
+        assertTrue(sessionWindowGroupingUpdatedTopology.contains(expectedSessionWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionTopicNameForGroupBySessionWindows() {
+
+        final String expectedSessionWindowRepartitionTopic = "(topic: session-window-grouping-repartition)";
+
+        final String sessionWindowGroupingRepartitionTopology = buildStreamGroupByKeySessionWindows(false, false);
+        assertTrue(sessionWindowGroupingRepartitionTopology.contains(expectedSessionWindowRepartitionTopic));
+
+        final String sessionWindowGroupingUpdatedTopology = buildStreamGroupByKeySessionWindows(true, false);
+        assertTrue(sessionWindowGroupingUpdatedTopology.contains(expectedSessionWindowRepartitionTopic));
+    }
+
+    @Test
+    public void shouldKeepRepartitionNameForGroupByKTable() {
+        final String expectedKTableGroupByRepartitionTopic = "(topic: ktable-group-by-repartition)";
+
+        final String ktableGroupByTopology = buildKTableGroupBy(false);
+        assertTrue(ktableGroupByTopology.contains(expectedKTableGroupByRepartitionTopic));
+
+        final String ktableUpdatedGroupByTopology = buildKTableGroupBy(true);
+        assertTrue(ktableUpdatedGroupByTopology.contains(expectedKTableGroupByRepartitionTopic));
+    }
+
+
+    private String buildKTableGroupBy(final boolean otherOperations) {
+        final String ktableGroupByTopicName = "ktable-group-by";
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> ktable = builder.table("topic");
+
+        if (otherOperations) {
+            ktable.filter((k, v) -> true).groupBy(KeyValue::pair, Grouped.as(ktableGroupByTopicName)).count();
+        } else {
+            ktable.groupBy(KeyValue::pair, Grouped.as(ktableGroupByTopicName)).count();
+        }
+
+        return builder.build().describe().toString();
+    }
+
+    private String buildStreamGroupByKeyTimeWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final String groupedTimeWindowRepartitionTopicName = "time-window-grouping";
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedTimeWindowRepartitionTopicName)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+
+    private String buildStreamGroupByKeySessionWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+        final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+
+    private String buildStreamGroupByKeyNoWindows(final boolean otherOperations, final boolean isGroupByKey) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> selectKeyStream = builder.<String, String>stream("topic").selectKey((k, v) -> k + v);
+
+        final String groupByAndCountRepartitionTopicName = "kstream-grouping";
+        if (isGroupByKey) {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupByKey(Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            }
+        } else {
+            if (otherOperations) {
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            } else {
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupByAndCountRepartitionTopicName)).count();
+            }
+        }
+
+        return builder.build().describe().toString();
+    }
+
+    private String buildStreamJoin(final boolean includeOtherOperations) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> initialStreamOne = builder.stream("topic-one");
+        final KStream<String, String> initialStreamTwo = builder.stream("topic-two");
+
+        final KStream<String, String> updatedStreamOne;
+        final KStream<String, String> updatedStreamTwo;
+
+        if (includeOtherOperations) {
+            // without naming the join, the repartition topic name would change due to operator changing before join performed
+            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
+            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v).filter((k, v) -> true).peek((k, v) -> System.out.println(k + v));
+        } else {
+            updatedStreamOne = initialStreamOne.selectKey((k, v) -> k + v);
+            updatedStreamTwo = initialStreamTwo.selectKey((k, v) -> k + v);
+        }
+
+        final String joinRepartitionTopicName = "my-join";
+        updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2,
+                JoinWindows.of(1000), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
+
+        return builder.build().describe().toString();
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString, final Pattern repartitionTopicPattern) {
+        final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private Topology buildTopology(final String optimizationConfig) {
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
+        final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
+        final List<String> processorValueCollector = new ArrayList<>();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v));
+
+        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault()))
+                .process(() -> new SimpleProcessor(processorValueCollector));
+
+        final KStream<String, Long> countStream = mappedStream.groupByKey(Grouped.as(firstRepartitionTopicName)).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
+
+        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
+
+        mappedStream.groupByKey(Grouped.as(secondRepartitionTopicName)).aggregate(initializer,
+                aggregator,
+                Materialized.with(Serdes.String(), Serdes.Integer()))
+                .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
+
+        // adding operators for case where the repartition node is further downstream
+        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey(Grouped.as(thirdRepartitionTopicName))
+                .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
+                .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+
+        mappedStream.filter((k, v) -> k.equals("A"))
+                .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
+                        JoinWindows.of(5000),
+                        Joined.with(Serdes.String(), Serdes.String(), Serdes.Long(), fourthRepartitionTopicName))
+                .to(JOINED_TOPIC);
+
+        final Properties properties = new Properties();
+
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+        return builder.build(properties);
+    }
+
+
+    private static class SimpleProcessor extends AbstractProcessor<String, String> {
+
+        final List<String> valueList;
+
+        SimpleProcessor(final List<String> valueList) {
+            this.valueList = valueList;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            valueList.add(value);
+        }
+    }
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+            "      --> KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000003\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000039\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+            "      --> KSTREAM-PROCESSOR-0000000004\n" +
+            "      <-- KSTREAM-FILTER-0000000002\n" +
+            "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000003\n" +
+            "    Sink: KSTREAM-SINK-0000000039 (topic: count-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000040\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000041 (topics: [count-stream-repartition])\n" +
+            "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" +
+            "      --> KTABLE-TOSTREAM-0000000011\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000007\n" +
+            "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000021\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-WINDOWED-0000000033\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-REDUCE-0000000023\n" +
+            "      <-- KSTREAM-FILTER-0000000020\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000035\n" +
+            "      <-- KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
+            "      --> KTABLE-TOSTREAM-0000000018\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000034\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
+            "      --> KTABLE-TOSTREAM-0000000027\n" +
+            "      <-- KSTREAM-PEEK-0000000021\n" +
+            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000019\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000014\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000028\n" +
+            "      <-- KSTREAM-REDUCE-0000000023\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000018\n" +
+            "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000027\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
+            "      <-- KSTREAM-MERGE-0000000037\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+            "      --> KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000021\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000003\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000031\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000025\n" +
+            "      <-- KSTREAM-FILTER-0000000020\n" +
+            "    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000008\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000015\n" +
+            "      <-- KSTREAM-MAP-0000000001\n" +
+            "    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000024\n" +
+            "      <-- KSTREAM-PEEK-0000000021\n" +
+            "    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000030\n" +
+            "      <-- KSTREAM-FILTER-0000000029\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+            "      --> KSTREAM-PROCESSOR-0000000004\n" +
+            "      <-- KSTREAM-FILTER-0000000002\n" +
+            "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000003\n" +
+            "    Sink: KSTREAM-SINK-0000000008 (topic: count-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000009\n" +
+            "    Sink: KSTREAM-SINK-0000000015 (topic: aggregate-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000016\n" +
+            "    Sink: KSTREAM-SINK-0000000024 (topic: reduced-stream-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000025\n" +
+            "    Sink: KSTREAM-SINK-0000000030 (topic: joined-stream-left-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000031\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000010 (topics: [count-stream-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000007\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n" +
+            "      --> KTABLE-TOSTREAM-0000000011\n" +
+            "      <-- KSTREAM-SOURCE-0000000010\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000007\n" +
+            "    Source: KSTREAM-SOURCE-0000000032 (topics: [joined-stream-left-repartition])\n" +
+            "      --> KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-JOINTHIS-0000000035\n" +
+            "      <-- KSTREAM-SOURCE-0000000032\n" +
+            "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-JOINOTHER-0000000036\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000034\n" +
+            "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n" +
+            "      --> KSTREAM-MERGE-0000000037\n" +
+            "      <-- KSTREAM-WINDOWED-0000000033\n" +
+            "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n" +
+            "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000011\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n" +
+            "      <-- KSTREAM-MERGE-0000000037\n" +
+            "\n" +
+            "  Sub-topology: 2\n" +
+            "    Source: KSTREAM-SOURCE-0000000017 (topics: [aggregate-stream-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000014\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n" +
+            "      --> KTABLE-TOSTREAM-0000000018\n" +
+            "      <-- KSTREAM-SOURCE-0000000017\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000019\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000014\n" +
+            "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000018\n" +
+            "\n" +
+            "  Sub-topology: 3\n" +
+            "    Source: KSTREAM-SOURCE-0000000026 (topics: [reduced-stream-repartition])\n" +
+            "      --> KSTREAM-REDUCE-0000000023\n" +
+            "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n" +
+            "      --> KTABLE-TOSTREAM-0000000027\n" +
+            "      <-- KSTREAM-SOURCE-0000000026\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000028\n" +
+            "      <-- KSTREAM-REDUCE-0000000023\n" +
+            "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n" +
+            "      <-- KTABLE-TOSTREAM-0000000027\n\n";
+
+
+}


Mime
View raw message