kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Fix generics in KStream.groupBy(...)
Date Tue, 05 Jul 2016 16:30:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 12fa188b9 -> 416c3a7d9


MINOR: Fix generics in KStream.groupBy(...)

The `KStream.groupBy(..)` calls don't change the value, only the key, so they don't need the
type param `V1` as the new stream will always be of type `KStream<K1, V>`.
The `Serde` in the overloaded `groupBy` should have a type param of  `V` to match the returned
`KStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #1584 from dguy/kstream-generics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/416c3a7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/416c3a7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/416c3a7d

Branch: refs/heads/trunk
Commit: 416c3a7d97d473d0d9928c814d68d502c21077b7
Parents: 12fa188
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 5 09:30:40 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 5 09:30:40 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/streams/kstream/KStream.java  | 8 +++-----
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java  | 6 +++---
 2 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/416c3a7d/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ae743b1..3ac0284 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -491,11 +491,10 @@ public interface KStream<K, V> {
      *
      * @param selector      select the grouping key and value to be aggregated
      * @param <K1>          the key type of the {@link KGroupedStream}
-     * @param <V1>          the value type of the {@link KGroupedStream}
      *
      * @return a {@link KGroupedStream} that contains the the grouped records of the original
{@link KStream}
      */
-    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector);
+    <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector);
 
     /**
      * Group the records of this {@link KStream} using the provided {@link KeyValueMapper}.
@@ -507,13 +506,12 @@ public interface KStream<K, V> {
      * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will
be used
      * @param <K1>          the key type of the {@link KGroupedStream}
-     * @param <V1>          the value type of the {@link KGroupedStream}
      *
      * @return a {@link KGroupedStream} that contains the the grouped records of the original
{@link KStream}
      */
-    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+    <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
                                             Serde<K1> keySerde,
-                                            Serde<V1> valSerde);
+                                            Serde<V> valSerde);
 
     /**
      * Group the records with the same key into a {@link KGroupedStream} while preserving
the

http://git-wip-us.apache.org/repos/asf/kafka/blob/416c3a7d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ca2e944..52b1c7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -537,14 +537,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     }
 
     @Override
-    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1>
selector) {
+    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1>
selector) {
         return groupBy(selector, null, null);
     }
 
     @Override
-    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1>
selector,
+    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1>
selector,
                                                    Serde<K1> keySerde,
-                                                   Serde<V1> valSerde) {
+                                                   Serde<V> valSerde) {
 
         String selectName = internalSelectKey(selector);
         return new KGroupedStreamImpl<>(topology,


Mime
View raw message