kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: make methods introduced in KAFKA-4490 consistent with KIP-100
Date Fri, 13 Jan 2017 00:20:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2f9048832 -> b6011918f


MINOR: make methods introduced in KAFKA-4490 consistent with KIP-100

and remove some unnecessary SuppressWarnings annotations

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #2363 from xvrl/kip-100-followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6011918
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6011918
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6011918

Branch: refs/heads/trunk
Commit: b6011918fbc36bfaa465bdcc750e2435985d9101
Parents: 2f90488
Author: Xavier Léauté <xavier@confluent.io>
Authored: Thu Jan 12 16:20:05 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jan 12 16:20:05 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  8 ++++----
 .../kstream/internals/GlobalKTableImpl.java     |  1 -
 .../internals/KStreamGlobalKTableJoin.java      |  8 ++++----
 .../streams/kstream/internals/KStreamImpl.java  | 21 ++++++++------------
 .../internals/KStreamKTableJoinProcessor.java   |  8 ++++----
 5 files changed, 20 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6011918/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 1645ee0..8bc54e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1677,8 +1677,8 @@ public interface KStream<K, V> {
      * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
      */
     <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
-                                         final KeyValueMapper<K, V, GK> keyValueMapper,
-                                         final ValueJoiner<? super V, ? super GV, ? super
RV> valueJoiner);
+                                         final KeyValueMapper<? super K, ? super V, ?
extends GK> keyValueMapper,
+                                         final ValueJoiner<? super V, ? super GV, ? extends
RV> valueJoiner);
 
     /**
      * Join records of this stream with {@link GlobalKTable}'s records using non-windowed
inner equi join
@@ -1708,6 +1708,6 @@ public interface KStream<K, V> {
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows)
      */
     <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
-                                     final KeyValueMapper<K, V, GK> keyValueMapper,
-                                     final ValueJoiner<? super V, ? super GV, ? super
RV> joiner);
+                                     final KeyValueMapper<? super K, ? super V, ? extends
GK> keyValueMapper,
+                                     final ValueJoiner<? super V, ? super GV, ? extends
RV> joiner);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6011918/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 0506b1c..cef6b67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -26,7 +26,6 @@ public class GlobalKTableImpl<K, V> implements GlobalKTable<K,
V> {
         this.valueGetterSupplier = valueGetterSupplier;
     }
 
-    @SuppressWarnings("unchecked")
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         return valueGetterSupplier;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6011918/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
index 27c13fe..099bf07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
@@ -25,13 +25,13 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1,
V1> {
 
     private final KTableValueGetterSupplier<K2, V2> valueGetterSupplier;
-    private final ValueJoiner<V1, V2, R> joiner;
-    private final KeyValueMapper<K1, V1, K2> mapper;
+    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
+    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> mapper;
     private final boolean leftJoin;
 
     KStreamGlobalKTableJoin(final KTableValueGetterSupplier<K2, V2> valueGetterSupplier,
-                            final ValueJoiner<V1, V2, R> joiner,
-                            final KeyValueMapper<K1, V1, K2> mapper,
+                            final ValueJoiner<? super V1, ? super V2, ? extends R>
joiner,
+                            final KeyValueMapper<? super K1, ? super V1, ? extends K2>
mapper,
                             final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         this.joiner = joiner;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6011918/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bad4e66..b99e55d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -129,7 +129,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ?
super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes,
true);
@@ -520,7 +519,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
         return sourceName;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(
         final KStream<K, V1> other,
@@ -548,7 +546,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
         return leftJoin(other, joiner, windows, null, null, null);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final
ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return join(other, joiner, null, null);
@@ -571,33 +568,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
 
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1>
globalTable,
-                                              final KeyValueMapper<K, V, K1> keyMapper,
-                                              final ValueJoiner<? super V, ? super V1,
? super R> joiner) {
+                                              final KeyValueMapper<? super K, ? super
V, ? extends K1> keyMapper,
+                                              final ValueJoiner<? super V, ? super V1,
? extends R> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, true);
     }
 
     @Override
     public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1>
globalTable,
-                                            final KeyValueMapper<K, V, K1> keyMapper,
-                                            final ValueJoiner<? super V, ? super V1, ?
super V2> joiner) {
+                                            final KeyValueMapper<? super K, ? super V,
? extends K1> keyMapper,
+                                            final ValueJoiner<? super V, ? super V1, ?
extends V2> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, false);
     }
 
-    @SuppressWarnings("unchecked")
     private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1,
V1> globalTable,
-                                                        final KeyValueMapper<K, V, K1>
keyMapper,
-                                                        final ValueJoiner<? super V, ?
super V1, ? super V2> joiner,
+                                                        final KeyValueMapper<? super K,
? super V, ? extends K1> keyMapper,
+                                                        final ValueJoiner<? super V, ?
super V1, ? extends V2> joiner,
                                                         final boolean leftJoin) {
         Objects.requireNonNull(globalTable, "globalTable can't be null");
         Objects.requireNonNull(keyMapper, "keyMapper can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl) globalTable).valueGetterSupplier();
+        final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1,
V1>) globalTable).valueGetterSupplier();
         final String name = topology.newName(LEFTJOIN_NAME);
-        topology.addProcessor(name, new KStreamGlobalKTableJoin(valueGetterSupplier, joiner,
keyMapper, leftJoin), this.name);
+        topology.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier,
joiner, keyMapper, leftJoin), this.name);
         return new KStreamImpl<>(topology, name, sourceNodes, false);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6011918/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index a9965b9..906a384 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -24,13 +24,13 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1,
V1> {
 
     private final KTableValueGetter<K2, V2> valueGetter;
-    private final KeyValueMapper<K1, V1, K2> keyMapper;
-    private final ValueJoiner<? super V1, ? super V2, R> joiner;
+    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
+    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
     private final boolean leftJoin;
 
     KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
-                               final KeyValueMapper<K1, V1, K2> keyMapper,
-                               final ValueJoiner<? super V1, ? super V2, R> joiner,
+                               final KeyValueMapper<? super K1, ? super V1, ? extends
K2> keyMapper,
+                               final ValueJoiner<? super V1, ? super V2, ? extends R>
joiner,
                                final boolean leftJoin) {
         this.valueGetter = valueGetter;
         this.keyMapper = keyMapper;


Mime
View raw message