kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5817; Add Serialized class and overloads to KStream#groupBy and KStream#groupByKey
Date Wed, 06 Sep 2017 09:43:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2fb5664bf -> b687c0680


KAFKA-5817; Add Serialized class and overloads to KStream#groupBy and KStream#groupByKey

Part of KIP-182
- Add the `Serialized` class
- implement overloads of `KStream#groupByKey` and KStream#groupBy`
- deprecate existing methods that have more than default arguments

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3772 from dguy/kafka-5817


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b687c068
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b687c068
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b687c068

Branch: refs/heads/trunk
Commit: b687c068008a81fad390c80da289249cc04b3efb
Parents: 2fb5664
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Sep 6 10:43:14 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Sep 6 10:43:14 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               | 35 ++++----
 .../examples/pageview/PageViewTypedDemo.java    |  3 +-
 .../examples/pageview/PageViewUntypedDemo.java  |  3 +-
 .../apache/kafka/streams/kstream/KStream.java   | 59 ++++++++++++-
 .../kafka/streams/kstream/Serialized.java       | 88 ++++++++++++++++++++
 .../streams/kstream/internals/KStreamImpl.java  | 38 ++++++---
 .../KStreamAggregationDedupIntegrationTest.java |  6 +-
 .../KStreamAggregationIntegrationTest.java      |  7 +-
 .../KStreamKTableJoinIntegrationTest.java       |  3 +-
 .../internals/KGroupedStreamImplTest.java       |  3 +-
 .../internals/KStreamWindowAggregateTest.java   |  8 +-
 .../kafka/streams/perf/YahooBenchmark.java      |  3 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  3 +-
 13 files changed, 215 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index b530e5e..8433bf3 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -842,8 +842,9 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                        // When the key and/or value types do not match the configured
                        // default serdes, we must explicitly specify serdes.
                        KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
-                           Serdes.ByteArray(), /* key */
-                           Serdes.String()     /* value */
+                           Serialized.with(
+                                Serdes.ByteArray(), /* key */
+                                Serdes.String())     /* value */
                        );
                 </pre>
             </td>
@@ -883,15 +884,17 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                        // Group the stream by a new key and key type
                        KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
                            (key, value) -> value,
-                           Serdes.String(), /* key (note: type was modified) */
-                           Serdes.String()  /* value */
+                           Serialize.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.String())  /* value */
                        );
 
                        // Group the table by a new key and key type, and also modify the
value and value type.
                        KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
                            (key, value) -> KeyValue.pair(value, value.length()),
-                           Serdes.String(), /* key (note: type was modified) */
-                           Serdes.Integer() /* value (note: type was modified) */
+                           Serialized.with(
+                               Serdes.String(), /* key (note: type was modified) */
+                               Serdes.Integer()) /* value (note: type was modified) */
                        );
 
 
@@ -905,8 +908,9 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                                   return value;
                                }
                            },
-                           Serdes.String(), /* key (note: type was modified) */
-                           Serdes.String()  /* value */
+                           Serialized.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.String())  /* value */
                        );
 
                        // Group the table by a new key and key type, and also modify the
value and value type.
@@ -917,8 +921,9 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                                    return KeyValue.pair(value, value.length());
                                 }
                             },
-                            Serdes.String(), /* key (note: type was modified) */
-                            Serdes.Integer() /* value (note: type was modified) */
+                            Serialized.with(
+                                Serdes.String(), /* key (note: type was modified) */
+                                Serdes.Integer()) /* value (note: type was modified) */
                        );
                 </pre>
             </td>
@@ -1659,7 +1664,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         KStream&lt;String, Integer&gt; wordCounts = ...;
 
         KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
-            .groupByKey(Serdes.String(), Serdes.Integer());
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
 
         KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
             () -> 0, /* initializer */
@@ -1763,7 +1768,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         // its prime purpose in this example is to show the *effects* of the grouping
         // in the subsequent aggregation.
         KGroupedTable&lt;String, Integer&gt; groupedTable = userProfiles
-            .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(),
Serdes.Integer());
+            .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serialized.with(Serdes.String(),
Serdes.Integer()));
 
         KTable&lt;String, Integer&gt; aggregated = groupedTable.aggregate(
             () -> 0, /* initializer */
@@ -2117,7 +2122,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
           // Define the processing topology (here: WordCount)
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
             .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-            .groupBy((key, word) -> word, stringSerde, stringSerde);
+            .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
 
           // Create a key-value store named "CountsKeyValueStore" for the all-time word counts
           groupedByWord.count("CountsKeyValueStore");
@@ -2173,7 +2178,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
           // Define the processing topology (here: WordCount)
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
             .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-            .groupBy((key, word) -> word, stringSerde, stringSerde);
+            .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
 
           // Create a window state store named "CountsWindowStore" that contains the word
counts for every minute
           groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
@@ -2396,7 +2401,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
 
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
               .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-              .groupBy((key, word) -> word, stringSerde, stringSerde);
+              .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
 
           // This call to `count()` creates a state store named "word-count".
           // The state store is discoverable and can be queried interactively.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 0783c65..676f8cc 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -168,7 +169,7 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .groupByKey(Serdes.String(), pageViewByRegionSerde)
+                .groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
                 .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e930985..5b87937 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -100,7 +101,7 @@ public class PageViewUntypedDemo {
                         return new KeyValue<>(viewRegion.get("region").textValue(),
viewRegion);
                     }
                 })
-                .groupByKey(Serdes.String(), jsonSerde)
+                .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                 .count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index d386696..f390167 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1012,7 +1012,7 @@ public interface KStream<K, V> {
      * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned
      * correctly on its key.
      * If the last key changing operator changed the key type, it is recommended to use
-     * {@link #groupByKey(Serde, Serde)} instead.
+     * {@link #groupByKey(Serialized)} instead.
      *
      * @return a {@link KGroupedStream} that contains the grouped records of the original
{@code KStream}
      * @see #groupBy(KeyValueMapper)
@@ -1020,6 +1020,31 @@ public interface KStream<K, V> {
     KGroupedStream<K, V> groupByKey();
 
     /**
+     * Group the records by their current key into a {@link KGroupedStream} while preserving
the original values
+     * and using the serializers as defined by {@link Serialized}.
+     * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * If a record key is {@code null} the record will not be included in the resulting {@link
KGroupedStream}.
+     * <p>
+     * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
+     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
+     * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened
afterwards (e.g., via
+     * {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
+     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId"
is user-specified in
+     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"XXX" is
+     * an internally generated name, and "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * For this case, all data of this stream will be redistributed through the repartitioning
topic by writing all
+     * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned
+     * correctly on its key.
+     *
+     * @return a {@link KGroupedStream} that contains the grouped records of the original
{@code KStream}
+     * @see #groupBy(KeyValueMapper)
+     */
+    KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);
+
+    /**
      * Group the records by their current key into a {@link KGroupedStream} while preserving
the original values.
      * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
      * (cf. {@link KGroupedStream}).
@@ -1043,7 +1068,9 @@ public interface KStream<K, V> {
      * @param valSerde value serdes for materializing this stream,
      *                 if not specified the default serdes defined in the configs will be
used
      * @return a {@link KGroupedStream} that contains the grouped records of the original
{@code KStream}
+     * @deprecated use {@code groupByKey(Serialized)}
      */
+    @Deprecated
     KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
                                     final Serde<V> valSerde);
 
@@ -1065,7 +1092,7 @@ public interface KStream<K, V> {
      * and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned on the new key.
      * <p>
      * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed
by {@link #groupByKey()}.
-     * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper,
Serde, Serde)} instead.
+     * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper,
Serialized)} instead.
      *
      * @param selector a {@link KeyValueMapper} that computes a new key for grouping
      * @param <KR>     the key type of the result {@link KGroupedStream}
@@ -1074,6 +1101,32 @@ public interface KStream<K, V> {
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super
V, KR> selector);
 
     /**
+     * Group the records of this {@code KStream} on a new key that is selected using the
provided {@link KeyValueMapper}
+     * and {@link Serde}s as specified by {@link Serialized}.
+     * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
+     * (cf. {@link KGroupedStream}).
+     * The {@link KeyValueMapper} selects a new key (with should be of the same type) while
preserving the original values.
+     * If the new record key is {@code null} the record will not be included in the resulting
{@link KGroupedStream}.
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in
Kafka.
+     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId"
is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"XXX" is
+     * an internally generated name, and "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * All data of this stream will be redistributed through the repartitioning topic by
writing all records to it,
+     * and rereading all records from it, such that the resulting {@link KGroupedStream}
is partitioned on the new key.
+     * <p>
+     * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed
by {@link #groupByKey()}.
+     *
+     * @param selector a {@link KeyValueMapper} that computes a new key for grouping
+     * @param <KR>     the key type of the result {@link KGroupedStream}
+     * @return a {@link KGroupedStream} that contains the grouped records of the original
{@code KStream}
+     */
+    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super
V, KR> selector,
+                                       final Serialized<KR, V> serialized);
+
+    /**
      * Group the records of this {@code KStream} on a new key that is selected using the
provided {@link KeyValueMapper}.
      * Grouping a stream on the record key is required before an aggregation operator can
be applied to the data
      * (cf. {@link KGroupedStream}).
@@ -1100,7 +1153,9 @@ public interface KStream<K, V> {
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original
{@code KStream}
      * @see #groupByKey()
+     * @deprecated use {@code groupBy(KeyValueMapper, Serialized}
      */
+    @Deprecated
     <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super
V, KR> selector,
                                        final Serde<KR> keySerde,
                                        final Serde<V> valSerde);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
new file mode 100644
index 0000000..18731a5
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Serialized.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * The class that is used to capture the key and value {@link Serde}s used when performing
+ * {@link KStream#groupBy(KeyValueMapper, Serialized)} and {@link KStream#groupByKey(Serialized)}
operations.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class Serialized<K, V> {
+
+    private Serde<K> keySerde;
+    private Serde<V> valueSerde;
+
+    private Serialized(final Serde<K> keySerde,
+                       final Serde<V> valueSerde) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    /**
+     * Construct a {@code Serialized} instance with the provided key and value {@link Serde}s.
+     * If the {@link Serde} params are {@code null} the default serdes defined in the configs
will be used.
+     *
+     * @param keySerde   keySerde that will be used to materialize a stream
+     *                   if not specified the default serdes defined in the configs will
be used
+     * @param valueSerde valueSerde that will be used to materialize a stream
+     *                   if not specified the default serdes defined in the configs will
be used
+     * @param <K>        the key type
+     * @param <V>        the value type
+     * @return a new instance of {@link Serialized} configured with the provided serdes
+     */
+    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde,
+                                               final Serde<V> valueSerde) {
+        return new Serialized<>(keySerde, valueSerde);
+    }
+
+    /**
+     * Construct a {@code Serialized} instance with the provided key {@link Serde}.
+     * If the {@link Serde} params are null the default serdes defined in the configs will
be used.
+     *
+     * @param keySerde keySerde that will be used to materialize a stream
+     *                 if not specified the default serdes defined in the configs will be
used
+     * @return a new instance of {@link Serialized} configured with the provided key serde
+     */
+    public Serialized<K, V> withKeySerde(final Serde<K> keySerde) {
+        return new Serialized<>(keySerde, null);
+    }
+
+    /**
+     * Construct a {@code Serialized} instance with the provided value {@link Serde}.
+     * If the {@link Serde} params are null the default serdes defined in the configs will
be used.
+     *
+     * @param valueSerde valueSerde that will be used to materialize a stream
+     *                   if not specified the default serdes defined in the configs will
be used
+     * @return a new instance of {@link Serialized} configured with the provided key serde
+     */
+    public Serialized<K, V> withValueSerde(final Serde<V> valueSerde) {
+        return new Serialized<>(null, valueSerde);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 040b66f..f46f222 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -714,36 +715,51 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super
K, ? super V, K1> selector) {
-        return groupBy(selector, null, null);
+        return groupBy(selector, Serialized.<K1, V>with(null, null));
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super
K, ? super V, K1> selector,
-                                              final Serde<K1> keySerde,
-                                              final Serde<V> valSerde) {
+    public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super
K, ? super V, KR> selector,
+                                              final Serialized<KR, V> serialized) {
         Objects.requireNonNull(selector, "selector can't be null");
+        Objects.requireNonNull(serialized, "serialized can't be null");
         String selectName = internalSelectKey(selector);
         return new KGroupedStreamImpl<>(builder,
                                         selectName,
                                         sourceNodes,
-                                        keySerde,
-                                        valSerde, true);
+                                        serialized.keySerde(),
+                                        serialized.valueSerde(),
+                                        true);
+    }
+
+    @Override
+    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super
K, ? super V, K1> selector,
+                                              final Serde<K1> keySerde,
+                                              final Serde<V> valSerde) {
+        Objects.requireNonNull(selector, "selector can't be null");
+        return groupBy(selector, Serialized.with(keySerde, valSerde));
     }
 
     @Override
     public KGroupedStream<K, V> groupByKey() {
-        return groupByKey(null, null);
+        return groupByKey(Serialized.<K, V>with(null, null));
     }
 
     @Override
-    public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
-                                           final Serde<V> valSerde) {
+    public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized)
{
         return new KGroupedStreamImpl<>(builder,
                                         this.name,
                                         sourceNodes,
-                                        keySerde,
-                                        valSerde,
+                                        serialized.keySerde(),
+                                        serialized.valueSerde(),
                                         this.repartitionRequired);
+
+    }
+
+    @Override
+    public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
+                                           final Serde<V> valSerde) {
+        return groupByKey(Serialized.with(keySerde, valSerde));
     }
 
     private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows
windows,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 14a3ea9..e070624 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.IntegrationTest;
@@ -103,8 +104,7 @@ public class KStreamAggregationDedupIntegrationTest {
         groupedStream = stream
             .groupBy(
                 mapper,
-                Serdes.String(),
-                Serdes.String());
+                Serialized.with(Serdes.String(), Serdes.String()));
 
         reducer = new Reducer<String>() {
             @Override
@@ -225,7 +225,7 @@ public class KStreamAggregationDedupIntegrationTest {
         produceMessages(timestamp);
         produceMessages(timestamp);
 
-        stream.groupByKey(Serdes.Integer(), Serdes.String())
+        stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
             .count(TimeWindows.of(500L), "count-windows")
             .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 8adae06..b1e4237 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -411,7 +412,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
         produceMessages(timestamp);
 
-        stream.groupByKey(Serdes.Integer(), Serdes.String())
+        stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
             .count(TimeWindows.of(500L), "count-windows")
             .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
                 @Override
@@ -513,7 +514,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, Long> results = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
         builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
-                .groupByKey(Serdes.String(), Serdes.String())
+                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
                 .toStream()
                 .foreach(new ForeachAction<Windowed<String>, Long>() {
@@ -600,7 +601,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(11);
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
-                .groupByKey(Serdes.String(), Serdes.String())
+                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .reduce(new Reducer<String>() {
                     @Override
                     public String apply(final String value1, final String value2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 176a37c..bc99d64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
@@ -242,7 +243,7 @@ public class KStreamKTableJoinIntegrationTest {
                 }
             })
             // Compute the total per region by summing the individual click counts per region.
-            .groupByKey(stringSerde, longSerde)
+            .groupByKey(Serialized.with(stringSerde, longSerde))
             .reduce(new Reducer<Long>() {
                 @Override
                 public Long apply(final Long value1, final Long value2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index e8f2a01..ba7296d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -68,7 +69,7 @@ public class KGroupedStreamImplTest {
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(),
TOPIC);
-        groupedStream = stream.groupByKey(Serdes.String(), Serdes.String());
+        groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 3c5a37d..f8dfdb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -60,8 +61,7 @@ public class KStreamWindowAggregateTest {
 
         KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
         KTable<Windowed<String>, String> table2 =
-            stream1.groupByKey(strSerde,
-                               strSerde)
+            stream1.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,
                            MockAggregator.TOSTRING_ADDER,
                            TimeWindows.of(10).advanceBy(5),
@@ -153,7 +153,7 @@ public class KStreamWindowAggregateTest {
 
         KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
         KTable<Windowed<String>, String> table1 =
-            stream1.groupByKey(strSerde, strSerde)
+            stream1.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,
                            MockAggregator.TOSTRING_ADDER,
                            TimeWindows.of(10).advanceBy(5),
@@ -164,7 +164,7 @@ public class KStreamWindowAggregateTest {
 
         KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
         KTable<Windowed<String>, String> table2 =
-            stream2.groupByKey(strSerde, strSerde)
+            stream2.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,
                            MockAggregator.TOSTRING_ADDER,
                            TimeWindows.of(10).advanceBy(5),

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 02e532d..6d9cf98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -353,7 +354,7 @@ public class YahooBenchmark {
 
         // calculate windowed counts
         keyedByCampaign
-            .groupByKey(Serdes.String(), Serdes.String())
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
             .count(TimeWindows.of(10 * 1000), "time-windows");
 
         return new KafkaStreams(builder.build(), streamConfig);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b687c068/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index c3820b8..fc7a915 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
@@ -119,7 +120,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         // min
         KGroupedStream<String, Integer>
             groupedData =
-            data.groupByKey(stringSerde, intSerde);
+            data.groupByKey(Serialized.with(stringSerde, intSerde));
 
         groupedData.aggregate(
                 new Initializer<Integer>() {


Mime
View raw message