kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-4481: relax streams api type contraints
Date Wed, 11 Jan 2017 17:11:23 GMT
KAFKA-4481: relax streams api type contraints

Make appropriate methods contravariant in key and value types.

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #2205 from xvrl/streams-contravariance


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a95170f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a95170f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a95170f8

Branch: refs/heads/trunk
Commit: a95170f822227c50414c57860e8547dc2e9d84cb
Parents: 75469a3
Author: Xavier Léauté <xavier@confluent.io>
Authored: Wed Jan 11 09:11:18 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 11 09:11:18 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  2 +-
 .../kafka/streams/kstream/KGroupedStream.java   | 14 +--
 .../kafka/streams/kstream/KGroupedTable.java    | 12 +--
 .../apache/kafka/streams/kstream/KStream.java   | 56 ++++++------
 .../apache/kafka/streams/kstream/KTable.java    | 28 +++---
 .../kstream/internals/KGroupedStreamImpl.java   |  8 +-
 .../kstream/internals/KGroupedTableImpl.java    | 34 +++----
 .../kstream/internals/KStreamAggregate.java     |  4 +-
 .../kstream/internals/KStreamFlatMap.java       |  6 +-
 .../kstream/internals/KStreamFlatMapValues.java |  6 +-
 .../streams/kstream/internals/KStreamImpl.java  | 96 +++++++++++---------
 .../kstream/internals/KStreamKStreamJoin.java   |  4 +-
 .../kstream/internals/KStreamKTableJoin.java    |  4 +-
 .../streams/kstream/internals/KStreamMap.java   |  6 +-
 .../kstream/internals/KStreamTransform.java     | 12 +--
 .../internals/KStreamWindowAggregate.java       |  4 +-
 .../kstream/internals/KTableAggregate.java      |  6 +-
 .../streams/kstream/internals/KTableFilter.java |  4 +-
 .../streams/kstream/internals/KTableImpl.java   | 32 +++----
 .../internals/KTableKTableAbstractJoin.java     |  4 +-
 .../kstream/internals/KTableKTableJoin.java     |  2 +-
 .../kstream/internals/KTableKTableLeftJoin.java |  2 +-
 .../internals/KTableKTableOuterJoin.java        |  2 +-
 .../internals/KTableKTableRightJoin.java        |  2 +-
 .../kstream/internals/KTableMapValues.java      |  4 +-
 .../kstream/internals/KTableRepartitionMap.java |  8 +-
 .../streams/processor/TopologyBuilder.java      |  2 +-
 .../processor/internals/RecordCollector.java    |  2 +-
 .../internals/RecordCollectorImpl.java          |  2 +-
 .../processor/internals/StandbyContextImpl.java |  2 +-
 .../internals/StreamsMetadataState.java         |  4 +-
 .../integration/KStreamRepartitionJoinTest.java | 35 ++-----
 .../streams/kstream/KStreamBuilderTest.java     |  2 +-
 .../internals/KGroupedStreamImplTest.java       | 24 ++---
 .../internals/KGroupedTableImplTest.java        | 38 +++++---
 .../kstream/internals/KStreamBranchTest.java    | 22 +++++
 .../kstream/internals/KStreamFilterTest.java    | 19 +++-
 .../kstream/internals/KStreamFlatMapTest.java   | 12 +--
 .../internals/KStreamFlatMapValuesTest.java     | 19 ++--
 .../kstream/internals/KStreamForeachTest.java   | 12 +++
 .../kstream/internals/KStreamImplTest.java      |  6 +-
 .../internals/KStreamKStreamJoinTest.java       | 10 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |  4 +-
 .../internals/KStreamKTableJoinTest.java        |  2 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  2 +-
 .../kstream/internals/KStreamMapTest.java       | 17 +++-
 .../kstream/internals/KStreamMapValuesTest.java |  6 +-
 .../kstream/internals/KStreamSelectKeyTest.java | 20 +++-
 .../kstream/internals/KStreamTransformTest.java | 14 +--
 .../internals/KStreamTransformValuesTest.java   | 12 +--
 .../internals/KStreamWindowAggregateTest.java   |  6 +-
 .../kstream/internals/KTableAggregateTest.java  | 12 +--
 .../kstream/internals/KTableFilterTest.java     | 16 ++++
 .../kstream/internals/KTableForeachTest.java    | 12 +++
 .../kstream/internals/KTableImplTest.java       |  8 +-
 .../kstream/internals/KTableKTableJoinTest.java |  6 +-
 .../internals/KTableKTableLeftJoinTest.java     | 20 ++--
 .../internals/KTableKTableOuterJoinTest.java    |  6 +-
 .../kstream/internals/KTableMapValuesTest.java  | 14 +--
 .../internals/ProcessorTopologyTest.java        |  6 +-
 .../internals/RecordCollectorTest.java          |  4 +-
 .../streams/smoketest/SmokeTestClient.java      | 10 +-
 .../kafka/streams/smoketest/SmokeTestUtil.java  | 12 +--
 .../streams/state/KeyValueStoreTestDriver.java  |  2 +-
 .../state/internals/StoreChangeLoggerTest.java  |  2 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  2 +-
 .../org/apache/kafka/test/MockAggregator.java   | 28 ++----
 .../org/apache/kafka/test/MockValueJoiner.java  | 19 ++--
 .../apache/kafka/test/NoOpRecordCollector.java  |  2 +-
 69 files changed, 465 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1ec10cb..cf49896 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -511,7 +511,7 @@ public class KafkaStreams {
      */
     public <K> StreamsMetadata metadataForKey(final String storeName,
                                               final K key,
-                                              final StreamPartitioner<K, ?> partitioner) {
+                                              final StreamPartitioner<? super K, ?> partitioner) {
         validateIsRunning();
         return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index c3e2f1e..8e69fdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -582,7 +582,7 @@ public interface KGroupedStream<K, V> {
      * (rolling) aggregate for each key
      */
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
-                                 final Aggregator<K, V, VR> aggregator,
+                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde,
                                  final String storeName);
 
@@ -622,7 +622,7 @@ public interface KGroupedStream<K, V> {
      * (rolling) aggregate for each key
      */
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
-                                 final Aggregator<K, V, VR> aggregator,
+                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                  final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -685,10 +685,10 @@ public interface KGroupedStream<K, V> {
      * the latest (rolling) aggregate for each key within a window
      */
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                            final Aggregator<K, V, VR> aggregator,
-                                                            final Windows<W> windows,
-                                                            final Serde<VR> aggValueSerde,
-                                                            final String storeName);
+                                                             final Aggregator<? super K, ? super V, VR> aggregator,
+                                                             final Windows<W> windows,
+                                                             final Serde<VR> aggValueSerde,
+                                                             final String storeName);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -734,7 +734,7 @@ public interface KGroupedStream<K, V> {
      * the latest (rolling) aggregate for each key within a window
      */
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                                             final Aggregator<K, V, VR> aggregator,
+                                                             final Aggregator<? super K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
                                                              final StateStoreSupplier<WindowStore> storeSupplier);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index c587538..9e23c07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -85,8 +85,8 @@ public interface KGroupedTable<K, V> {
      *         containing aggregated values for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<K, V, T> adder,
-                               Aggregator<K, V, T> subtractor,
+                               Aggregator<? super K, ? super V, T> adder,
+                               Aggregator<? super K, ? super V, T> subtractor,
                                Serde<T> aggValueSerde,
                                String storeName);
 
@@ -107,8 +107,8 @@ public interface KGroupedTable<K, V> {
      *         containing aggregated values for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<K, V, T> adder,
-                               Aggregator<K, V, T> subtractor,
+                               Aggregator<? super K, ? super V, T> adder,
+                               Aggregator<? super K, ? super V, T> subtractor,
                                String storeName);
 
     /**
@@ -126,8 +126,8 @@ public interface KGroupedTable<K, V> {
      *         containing aggregated values for each key
      */
     <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<K, V, T> adder,
-                               Aggregator<K, V, T> subtractor,
+                               Aggregator<? super K, ? super V, T> adder,
+                               Aggregator<? super K, ? super V, T> subtractor,
                                final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0686be2..245925a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -57,7 +57,7 @@ public interface KStream<K, V> {
      * @return a {@link KStream} that contains only those records that satisfy the given predicate
      * @see #filterNot(Predicate)
      */
-    KStream<K, V> filter(final Predicate<K, V> predicate);
+    KStream<K, V> filter(Predicate<? super K, ? super V> predicate);
 
     /**
      * Create a new {@link KStream} that consists all records of this stream which do <em>not</em> satisfy a predicate.
@@ -67,7 +67,7 @@ public interface KStream<K, V> {
      * @return a {@link KStream} that contains only those records that do <em>not</em> satisfy the given predicate
      * @see #filter(Predicate)
      */
-    KStream<K, V> filterNot(final Predicate<K, V> predicate);
+    KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate);
 
     /**
      * Set a new key (with possibly new type) for each input record.
@@ -98,7 +98,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #flatMapValues(ValueMapper)
      */
-    <KR> KStream<KR, V> selectKey(final KeyValueMapper<K, V, KR> mapper);
+    <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
     /**
      * Transform each record of the input stream into a new record in the output stream
@@ -131,7 +131,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #flatMapValues(ValueMapper)
      */
-    <KR, VR> KStream<KR, VR> map(final KeyValueMapper<K, V, KeyValue<KR, VR>> mapper);
+    <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
 
     /**
      * Transform the value of each input record into a new value (with possible new type) of the output record.
@@ -162,7 +162,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #transformValues(ValueTransformerSupplier, String...)
      */
-    <VR> KStream<K, VR> mapValues(final ValueMapper<V, VR> mapper);
+    <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper);
 
     /**
      * Transform each record of the input stream into zero or more records in the output stream (both key and value type
@@ -205,7 +205,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #transform(TransformerSupplier, String...)
      */
-    <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<K, V, Iterable<KeyValue<KR, VR>>> mapper);
+    <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
     /**
      * Create a new instance of {@link KStream} by transforming the value of each element in this stream into zero or
@@ -241,7 +241,7 @@ public interface KStream<K, V> {
      * @see #flatMap(KeyValueMapper)
      * @see #mapValues(ValueMapper)
      */
-    <VR> KStream<K, VR> flatMapValues(final ValueMapper<V, Iterable<VR>> processor);
+    <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);
 
     /**
      * Print the elements of this stream to {@code System.out}.
@@ -370,7 +370,7 @@ public interface KStream<K, V> {
      * @param action an action to perform on each record
      * @see #process(ProcessorSupplier, String...)
      */
-    void foreach(final ForeachAction<K, V> action);
+    void foreach(final ForeachAction<? super K, ? super V> action);
 
     /**
      * Creates an array of {@link KStream} from this stream by branching the elements in the original stream based on
@@ -386,7 +386,7 @@ public interface KStream<K, V> {
      * @return multiple distinct substreams of this {@link KStream}
      */
     @SuppressWarnings("unchecked")
-    KStream<K, V>[] branch(final Predicate<K, V>... predicates);
+    KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);
 
     /**
      * Materialize this stream to a topic and creates a new instance of {@link KStream} from the topic using default
@@ -420,7 +420,7 @@ public interface KStream<K, V> {
      * @param topic       the topic name
      * @return a {@link KStream} that contains the exact same records as this {@link KStream}
      */
-    KStream<K, V> through(final StreamPartitioner<K, V> partitioner,
+    KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                           final String topic);
 
     /**
@@ -475,7 +475,7 @@ public interface KStream<K, V> {
      */
     KStream<K, V> through(final Serde<K> keySerde,
                           final Serde<V> valSerde,
-                          final StreamPartitioner<K, V> partitioner,
+                          final StreamPartitioner<? super K, ? super V> partitioner,
                           final String topic);
 
     /**
@@ -500,7 +500,7 @@ public interface KStream<K, V> {
      *                    be used
      * @param topic       the topic name
      */
-    void to(final StreamPartitioner<K, V> partitioner,
+    void to(final StreamPartitioner<? super K, ? super V> partitioner,
             final String topic);
 
     /**
@@ -543,7 +543,7 @@ public interface KStream<K, V> {
      */
     void to(final Serde<K> keySerde,
             final Serde<V> valSerde,
-            final StreamPartitioner<K, V> partitioner,
+            final StreamPartitioner<? super K, ? super V> partitioner,
             final String topic);
 
     /**
@@ -623,7 +623,7 @@ public interface KStream<K, V> {
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #process(ProcessorSupplier, String...)
      */
-    <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier,
+    <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier,
                                        final String... stateStoreNames);
 
     /**
@@ -698,7 +698,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #transform(TransformerSupplier, String...)
      */
-    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<V, VR> valueTransformerSupplier,
+    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                         final String... stateStoreNames);
 
     /**
@@ -761,7 +761,7 @@ public interface KStream<K, V> {
      * @see #foreach(ForeachAction)
      * @see #transform(TransformerSupplier, String...)
      */
-    void process(final ProcessorSupplier<K, V> processorSupplier,
+    void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
                  final String... stateStoreNames);
 
     /**
@@ -792,7 +792,7 @@ public interface KStream<K, V> {
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
      * @see #groupByKey()
      */
-    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<K, V, KR> selector,
+    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                                        final Serde<KR> keySerde,
                                        final Serde<V> valSerde);
 
@@ -821,7 +821,7 @@ public interface KStream<K, V> {
      * @param <KR>     the key type of the result {@link KGroupedStream}
      * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
      */
-    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<K, V, KR> selector);
+    <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector);
 
     /**
      * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
@@ -949,7 +949,7 @@ public interface KStream<K, V> {
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows)
      */
     <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
-                                 final ValueJoiner<V, VO, VR> joiner,
+                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);
 
     /**
@@ -1026,7 +1026,7 @@ public interface KStream<K, V> {
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
      */
     <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
-                                 final ValueJoiner<V, VO, VR> joiner,
+                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
@@ -1106,7 +1106,7 @@ public interface KStream<K, V> {
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows)
      */
     <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
-                                     final ValueJoiner<V, VO, VR> joiner,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                      final JoinWindows windows);
 
     /**
@@ -1188,7 +1188,7 @@ public interface KStream<K, V> {
      * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
      */
     <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
-                                     final ValueJoiner<V, VO, VR> joiner,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                      final JoinWindows windows,
                                      final Serde<K> keySerde,
                                      final Serde<V> thisValSerde,
@@ -1269,7 +1269,7 @@ public interface KStream<K, V> {
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows)
      */
     <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
-                                      final ValueJoiner<V, VO, VR> joiner,
+                                      final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                       final JoinWindows windows);
 
     /**
@@ -1352,7 +1352,7 @@ public interface KStream<K, V> {
      * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
      */
     <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
-                                      final ValueJoiner<V, VO, VR> joiner,
+                                      final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                       final JoinWindows windows,
                                       final Serde<K> keySerde,
                                       final Serde<V> thisValueSerde,
@@ -1424,7 +1424,7 @@ public interface KStream<K, V> {
      * @see #leftJoin(KTable, ValueJoiner)
      */
     <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoiner<V, VT, VR> joiner);
+                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
 
     /**
      * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join.
@@ -1495,7 +1495,7 @@ public interface KStream<K, V> {
      * @see #leftJoin(KTable, ValueJoiner, Serde, Serde)
      */
     <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
-                                 final ValueJoiner<V, VT, VR> joiner,
+                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                  final Serde<K> keySerde,
                                  final Serde<V> valSerde);
 
@@ -1568,7 +1568,7 @@ public interface KStream<K, V> {
      * @see #join(KTable, ValueJoiner)
      */
     <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoiner<V, VT, VR> joiner);
+                                     final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
 
     /**
      * Join records of this stream with {@link KTable}'s records using non-windowed left equi join.
@@ -1642,7 +1642,7 @@ public interface KStream<K, V> {
      * @see #join(KTable, ValueJoiner, Serde, Serde)
      */
     <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
-                                     final ValueJoiner<V, VT, VR> joiner,
+                                     final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 29be3e1..5d6fa6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -48,7 +48,7 @@ public interface KTable<K, V> {
      *
      * @return a {@link KTable} that contains only those records that satisfy the given predicate
      */
-    KTable<K, V> filter(Predicate<K, V> predicate);
+    KTable<K, V> filter(Predicate<? super K, ? super V> predicate);
 
     /**
      * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
@@ -57,7 +57,7 @@ public interface KTable<K, V> {
      *
      * @return a {@link KTable} that contains only those records that do not satisfy the given predicate
      */
-    KTable<K, V> filterNot(Predicate<K, V> predicate);
+    KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate);
 
     /**
      * Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream.
@@ -67,7 +67,7 @@ public interface KTable<K, V> {
      *
      * @return a {@link KTable} that contains records with unmodified keys and new values of different type
      */
-    <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+    <V1> KTable<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper);
 
 
     /**
@@ -195,7 +195,7 @@ public interface KTable<K, V> {
      * @param storeName    the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic, String storeName);
+    KTable<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic, String storeName);
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
@@ -241,7 +241,7 @@ public interface KTable<K, V> {
      * @param storeName    the state store name used for this KTable
      * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
      */
-    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic, String storeName);
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic, String storeName);
 
     /**
      * Materialize this stream to a topic using default serializers specified in the config
@@ -259,7 +259,7 @@ public interface KTable<K, V> {
      *                     if not specified producer's {@link DefaultPartitioner} will be used
      * @param topic        the topic name
      */
-    void to(StreamPartitioner<K, V> partitioner, String topic);
+    void to(StreamPartitioner<? super K, ? super V> partitioner, String topic);
 
     /**
      * Materialize this stream to a topic. If {@code keySerde} provides a
@@ -288,7 +288,7 @@ public interface KTable<K, V> {
      *                     &mdash; otherwise {@link DefaultPartitioner} will be used
      * @param topic        the topic name
      */
-    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic);
 
     /**
      * Convert this stream to a new instance of {@link KStream}.
@@ -310,7 +310,7 @@ public interface KTable<K, V> {
      *         the records are no longer treated as updates on a primary-keyed table,
      *         but rather as normal key-value pairs in a record stream
      */
-    <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper);
+    <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper);
 
     /**
      * Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join.
@@ -323,7 +323,7 @@ public interface KTable<K, V> {
      * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
      *         one for each matched record-pair with the same key
      */
-    <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
+    <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
 
     /**
      * Combine values of this stream with another {@link KTable} stream's elements of the same key using Outer Join.
@@ -336,7 +336,7 @@ public interface KTable<K, V> {
      * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
      *         one for each matched record-pair with the same key
      */
-    <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
+    <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
 
     /**
      * Combine values of this stream with another {@link KTable} stream's elements of the same key using Left Join.
@@ -349,7 +349,7 @@ public interface KTable<K, V> {
      * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
      *         one for each matched record-pair with the same key
      */
-    <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
+    <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
 
     /**
      * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}.
@@ -364,7 +364,7 @@ public interface KTable<K, V> {
      *
      * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
      */
-    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
+    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
 
     /**
      * Group the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers and deserializers.
@@ -375,7 +375,7 @@ public interface KTable<K, V> {
      *
      * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
      */
-    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector);
+    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector);
 
     /**
      * Perform an action on each element of {@link KTable}.
@@ -383,7 +383,7 @@ public interface KTable<K, V> {
      *
      * @param action an action to perform on each element
      */
-    void foreach(ForeachAction<K, V> action);
+    void foreach(ForeachAction<? super K, ? super V> action);
 
     /**
      * Get the name of the local state store used for materializing this {@link KTable}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index a4fc793..e8930f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -100,7 +100,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
-                                      final Aggregator<K, V, T> aggregator,
+                                      final Aggregator<? super K, ? super V, T> aggregator,
                                       final Serde<T> aggValueSerde,
                                       final String storeName) {
         return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName));
@@ -108,7 +108,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
-                                      final Aggregator<K, V, T> aggregator,
+                                      final Aggregator<? super K, ? super V, T> aggregator,
                                       final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
@@ -122,7 +122,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
-                                                                  final Aggregator<K, V, T> aggregator,
+                                                                  final Aggregator<? super K, ? super V, T> aggregator,
                                                                   final Windows<W> windows,
                                                                   final Serde<T> aggValueSerde,
                                                                   final String storeName) {
@@ -132,7 +132,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
-                                                                  final Aggregator<K, V, T> aggregator,
+                                                                  final Aggregator<? super K, ? super V, T> aggregator,
                                                                   final Windows<W> windows,
                                                                   final StateStoreSupplier<WindowStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 4ca69d6..4edfa89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -46,14 +46,14 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final Serde<K> keySerde;
-    protected final Serde<V> valSerde;
+    protected final Serde<? extends K> keySerde;
+    protected final Serde<? extends V> valSerde;
 
     public KGroupedTableImpl(KStreamBuilder topology,
                              String name,
                              String sourceName,
-                             Serde<K> keySerde,
-                             Serde<V> valSerde) {
+                             Serde<? extends K> keySerde,
+                             Serde<? extends V> valSerde) {
         super(topology, name, Collections.singleton(sourceName));
         this.keySerde = keySerde;
         this.valSerde = valSerde;
@@ -61,8 +61,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<K, V, T> adder,
-                                      Aggregator<K, V, T> subtractor,
+                                      Aggregator<? super K, ? super V, T> adder,
+                                      Aggregator<? super K, ? super V, T> subtractor,
                                       Serde<T> aggValueSerde,
                                       String storeName) {
         return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName));
@@ -71,16 +71,16 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<K, V, T> adder,
-                                      Aggregator<K, V, T> subtractor,
+                                      Aggregator<? super K, ? super V, T> adder,
+                                      Aggregator<? super K, ? super V, T> subtractor,
                                       String storeName) {
         return aggregate(initializer, adder, subtractor, null, storeName);
     }
 
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                                      Aggregator<K, V, T> adder,
-                                      Aggregator<K, V, T> subtractor,
+                                      Aggregator<? super K, ? super V, T> adder,
+                                      Aggregator<? super K, ? super V, T> subtractor,
                                       StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(adder, "adder can't be null");
@@ -99,13 +99,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
-        Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+        Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+        Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
 
-        ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+        ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
@@ -168,4 +168,4 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                 storeSupplier);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index fdc14df..082b2e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -28,12 +28,12 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
     private final String storeName;
     private final Initializer<T> initializer;
-    private final Aggregator<K, V, T> aggregator;
+    private final Aggregator<? super K, ? super V, T> aggregator;
 
 
     private boolean sendOldValues = false;
 
-    public KStreamAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
+    public KStreamAggregate(String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.aggregator = aggregator;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index ff7f9ed..c038cc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -25,9 +25,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper;
+    private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper;
 
-    KStreamFlatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
+    KStreamFlatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         this.mapper = mapper;
     }
 
@@ -39,7 +39,7 @@ class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
     private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            for (KeyValue<K1, V1> newPair : mapper.apply(key, value)) {
+            for (KeyValue<? extends K1, ? extends V1> newPair : mapper.apply(key, value)) {
                 context().forward(newPair.key, newPair.value);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index 97d6b7a..7cb1176 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -24,9 +24,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V, ? extends Iterable<V1>> mapper;
+    private final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper;
 
-    KStreamFlatMapValues(ValueMapper<V, ? extends Iterable<V1>> mapper) {
+    KStreamFlatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
         this.mapper = mapper;
     }
 
@@ -38,7 +38,7 @@ class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
     private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            Iterable<V1> newValues = mapper.apply(value);
+            Iterable<? extends V1> newValues = mapper.apply(value);
             for (V1 v : newValues) {
                 context().forward(key, v);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b67fca5..a8e2422 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -107,7 +107,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> filter(Predicate<K, V> predicate) {
+    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
 
@@ -117,7 +117,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
+    public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
 
@@ -128,35 +128,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     @SuppressWarnings("unchecked")
-    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
     }
 
-    private <K1> String internalSelectKey(final KeyValueMapper<K, V, K1> mapper) {
+    private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         String name = topology.newName(KEY_SELECT_NAME);
-        topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() {
-            @Override
-            public KeyValue<K1, V> apply(K key, V value) {
-                return new KeyValue<>(mapper.apply(key, value), value);
-            }
-        }), this.name);
+        topology.addProcessor(
+            name,
+            new KStreamMap<>(
+                new KeyValueMapper<K, V, KeyValue<K1, V>>() {
+                    @Override
+                    public KeyValue<K1, V> apply(K key, V value) {
+                        return new KeyValue<>(mapper.apply(key, value), value);
+                    }
+                }
+            ),
+            this.name
+        );
         return name;
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(MAP_NAME);
 
-        topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
+        topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name);
 
         return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
 
     @Override
-    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+    public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(MAPVALUES_NAME);
 
@@ -228,7 +234,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
+    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(FLATMAP_NAME);
 
@@ -238,7 +244,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
+    public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(FLATMAPVALUES_NAME);
 
@@ -249,11 +255,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     @SuppressWarnings("unchecked")
-    public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
+    public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicates) {
         if (predicates.length == 0) {
             throw new IllegalArgumentException("you must provide at least one predicate");
         }
-        for (Predicate<K, V> predicate : predicates) {
+        for (Predicate<? super K, ? super V> predicate : predicates) {
             Objects.requireNonNull(predicate, "predicates can't have null values");
         }
         String branchName = topology.newName(BRANCH_NAME);
@@ -296,14 +302,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         to(keySerde, valSerde, partitioner, topic);
 
         return topology.stream(keySerde, valSerde, topic);
     }
 
     @Override
-    public void foreach(ForeachAction<K, V> action) {
+    public void foreach(ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
 
@@ -316,7 +322,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+    public KStream<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         return through(null, null, partitioner, topic);
     }
 
@@ -331,7 +337,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+    public void to(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         to(null, null, partitioner, topic);
     }
 
@@ -342,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
         String name = topology.newName(SINK_NAME);
 
@@ -358,7 +364,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
+    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier, String... stateStoreNames) {
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
         String name = topology.newName(TRANSFORM_NAME);
 
@@ -369,7 +375,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
+    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
         String name = topology.newName(TRANSFORMVALUES_NAME);
 
@@ -380,7 +386,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
+    public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, String... stateStoreNames) {
         String name = topology.newName(PROCESSOR_NAME);
 
         topology.addProcessor(name, processorSupplier, this.name);
@@ -390,7 +396,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1, R> KStream<K, R> join(
             final KStream<K, V1> other,
-            final ValueJoiner<V, V1, R> joiner,
+            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
             final JoinWindows windows,
             final Serde<K> keySerde,
             final Serde<V> thisValueSerde,
@@ -402,7 +408,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1, R> KStream<K, R> join(
         final KStream<K, V1> other,
-        final ValueJoiner<V, V1, R> joiner,
+        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
         final JoinWindows windows) {
 
         return join(other, joiner, windows, null, null, null);
@@ -411,7 +417,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1, R> KStream<K, R> outerJoin(
         final KStream<K, V1> other,
-        final ValueJoiner<V, V1, R> joiner,
+        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
         final JoinWindows windows,
         final Serde<K> keySerde,
         final Serde<V> thisValueSerde,
@@ -423,14 +429,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1, R> KStream<K, R> outerJoin(
         final KStream<K, V1> other,
-        final ValueJoiner<V, V1, R> joiner,
+        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
         final JoinWindows windows) {
 
         return outerJoin(other, joiner, windows, null, null, null);
     }
 
     private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
-                                         final ValueJoiner<V, V1, R> joiner,
+                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                          final JoinWindows windows,
                                          final Serde<K> keySerde,
                                          final Serde<V> thisValueSerde,
@@ -516,7 +522,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <V1, R> KStream<K, R> leftJoin(
         final KStream<K, V1> other,
-        final ValueJoiner<V, V1, R> joiner,
+        final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
         final JoinWindows windows,
         final Serde<K> keySerde,
         final Serde<V> thisValSerde,
@@ -533,23 +539,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1, R> KStream<K, R> leftJoin(
-            KStream<K, V1> other,
-            ValueJoiner<V, V1, R> joiner,
-            JoinWindows windows) {
+        KStream<K, V1> other,
+        ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+        JoinWindows windows) {
 
         return leftJoin(other, joiner, windows, null, null, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+    public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return join(other, joiner, null, null);
 
     }
 
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
-                                      final ValueJoiner<V, V1, R> joiner,
+                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                       final Serde<K> keySerde,
                                       final Serde<V> valueSerde) {
         if (repartitionRequired) {
@@ -562,7 +568,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
-                                                    final ValueJoiner<V, V1, R> joiner,
+                                                    final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                                     final boolean leftJoin) {
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
@@ -579,12 +585,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return leftJoin(other, joiner, null, null);
     }
 
     public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
-                                          final ValueJoiner<V, V1, R> joiner,
+                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                           final Serde<K> keySerde,
                                           final Serde<V> valueSerde) {
         if (repartitionRequired) {
@@ -597,12 +603,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) {
+    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector) {
         return groupBy(selector, null, null);
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
+    public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector,
                                               Serde<K1> keySerde,
                                               Serde<V> valSerde) {
 
@@ -656,7 +662,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
                                                    KStream<K1, V2> other,
-                                                   ValueJoiner<V1, V2, R> joiner,
+                                                   ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
                                                    JoinWindows windows,
                                                    Serde<K1> keySerde,
                                                    Serde<V1> lhsValueSerde,
@@ -681,12 +687,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                     windows.before + windows.after + 1,
                                                                                     windows.maintainMs());
 
-            final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+            final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
                 windows.before,
                 windows.after,
                 joiner,
                 leftOuter);
-            final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+            final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
                 windows.after,
                 windows.before,
                 reverseJoiner(joiner),
@@ -703,7 +709,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
             topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
 
-            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
             return new KStreamImpl<>(topology, joinMergeName, allSourceNodes, false);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 41547b9..8d2f1a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -32,10 +32,10 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
     private final long joinBeforeMs;
     private final long joinAfterMs;
 
-    private final ValueJoiner<V1, V2, R> joiner;
+    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
 
-    KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+    KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, boolean outer) {
         this.otherWindowName = otherWindowName;
         this.joinBeforeMs = joinBeforeMs;
         this.joinAfterMs = joinAfterMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index 1027b96..3fa9d8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -26,10 +26,10 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
-    private final ValueJoiner<V1, V2, R> joiner;
+    private final ValueJoiner<? super V1, ? super V2, R> joiner;
     private final boolean leftJoin;
 
-    KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<V1, V2, R> joiner, final boolean leftJoin) {
+    KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<? super V1, ? super V2, R> joiner, final boolean leftJoin) {
         valueGetterSupplier = table.valueGetterSupplier();
         this.joiner = joiner;
         this.leftJoin = leftJoin;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index a40449b..c3814a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -25,9 +25,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
+    private final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper;
 
-    public KStreamMap(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+    public KStreamMap(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
         this.mapper = mapper;
     }
 
@@ -39,7 +39,7 @@ class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
     private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            KeyValue<K1, V1> newPair = mapper.apply(key, value);
+            KeyValue<? extends K1, ? extends V1> newPair = mapper.apply(key, value);
             context().forward(newPair.key, newPair.value);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index af100a2..f7fa9f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -27,9 +27,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier;
+    private final TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier;
 
-    public KStreamTransform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
+    public KStreamTransform(TransformerSupplier<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> transformerSupplier) {
         this.transformerSupplier = transformerSupplier;
     }
 
@@ -40,9 +40,9 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
     public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
 
-        private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
+        private final Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer;
 
-        public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
+        public KStreamTransformProcessor(Transformer<? super K1, ? super V1, ? extends KeyValue<? extends K2, ? extends V2>> transformer) {
             this.transformer = transformer;
         }
 
@@ -54,7 +54,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
         @Override
         public void process(K1 key, V1 value) {
-            KeyValue<K2, V2> pair = transformer.transform(key, value);
+            KeyValue<? extends K2, ? extends V2> pair = transformer.transform(key, value);
 
             if (pair != null)
                 context().forward(pair.key, pair.value);
@@ -62,7 +62,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
         @Override
         public void punctuate(long timestamp) {
-            KeyValue<K2, V2> pair = transformer.punctuate(timestamp);
+            KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);
 
             if (pair != null)
                 context().forward(pair.key, pair.value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 25fdda9..43d2846 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -36,11 +36,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
     private final String storeName;
     private final Windows<W> windows;
     private final Initializer<T> initializer;
-    private final Aggregator<K, V, T> aggregator;
+    private final Aggregator<? super K, ? super V, T> aggregator;
 
     private boolean sendOldValues = false;
 
-    public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<K, V, T> aggregator) {
+    public KStreamWindowAggregate(Windows<W> windows, String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index a414200..e9e6bfc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -29,12 +29,12 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
 
     private final String storeName;
     private final Initializer<T> initializer;
-    private final Aggregator<K, V, T> add;
-    private final Aggregator<K, V, T> remove;
+    private final Aggregator<? super K, ? super V, T> add;
+    private final Aggregator<? super K, ? super V, T> remove;
 
     private boolean sendOldValues = false;
 
-    public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<K, V, T> add, Aggregator<K, V, T> remove) {
+    public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> add, Aggregator<? super K, ? super V, T> remove) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.add = add;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 059a36f..a434a5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -25,12 +25,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     private final KTableImpl<K, ?, V> parent;
-    private final Predicate<K, V> predicate;
+    private final Predicate<? super K, ? super V> predicate;
     private final boolean filterNot;
 
     private boolean sendOldValues = false;
 
-    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterNot) {
+    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<? super K, ? super V> predicate, boolean filterNot) {
         this.parent = parent;
         this.predicate = predicate;
         this.filterNot = filterNot;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 06f5945..30e2712 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -90,7 +90,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filter(Predicate<K, V> predicate) {
+    public KTable<K, V> filter(Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
@@ -100,7 +100,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
+    public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
@@ -111,7 +111,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+    public <V1> KTable<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
         Objects.requireNonNull(mapper);
         String name = topology.newName(MAPVALUES_NAME);
         KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
@@ -180,7 +180,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public void foreach(final ForeachAction<K, V> action) {
+    public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
         KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
@@ -195,7 +195,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
-                                StreamPartitioner<K, V> partitioner,
+                                StreamPartitioner<? super K, ? super V> partitioner,
                                 String topic,
                                 final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
@@ -210,7 +210,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic, final String storeName) {
+    public KTable<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic, final String storeName) {
         return through(null, null, partitioner, topic, storeName);
     }
 
@@ -225,7 +225,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+    public void to(StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         to(null, null, partitioner, topic);
     }
 
@@ -235,7 +235,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic) {
         this.toStream().to(keySerde, valSerde, partitioner, topic);
     }
 
@@ -254,27 +254,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) {
+    public <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         return toStream().selectKey(mapper);
     }
 
     @Override
-    public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+    public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return doJoin(other, joiner, false, false);
     }
 
     @Override
-    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+    public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return doJoin(other, joiner, true, true);
     }
 
     @Override
-    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
+    public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
         return doJoin(other, joiner, true, false);
     }
 
     @SuppressWarnings("unchecked")
-    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, final boolean leftOuter, final boolean rightOuter) {
+    private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
@@ -320,14 +320,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
                                                   Serde<K1> keySerde,
                                                   Serde<V1> valueSerde) {
 
         Objects.requireNonNull(selector, "selector can't be null");
         String selectName = topology.newName(SELECT_NAME);
 
-        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
+        KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
         topology.addProcessor(selectName, selectSupplier, this.name);
@@ -337,7 +337,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) {
+    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
         return this.groupBy(selector, null, null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 5e441aa..3ad31cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -25,13 +25,13 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessor
     protected final KTableImpl<K, ?, V2> table2;
     protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
     protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    protected final ValueJoiner<V1, V2, R> joiner;
+    protected final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
 
     protected boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
                              KTableImpl<K, ?, V2> table2,
-                             ValueJoiner<V1, V2, R> joiner) {
+                             ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         this.table1 = table1;
         this.table2 = table2;
         this.valueGetterSupplier1 = table1.valueGetterSupplier();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 49f6715..9f86d7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
+    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 5f5cad6..cfdaf61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
+    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 2bfd8a5..5b32415 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 
 class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
+    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 8aeadcc..17fa5cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
 
-    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
+    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index daabb00..ef60fd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -26,11 +26,11 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
     private final KTableImpl<K, ?, V> parent;
-    private final ValueMapper<V, V1> mapper;
+    private final ValueMapper<? super V, ? extends V1> mapper;
 
     private boolean sendOldValues = false;
 
-    public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<V, V1> mapper) {
+    public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<? super V, ? extends V1> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }


Mime
View raw message