kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-4481: relax streams api type contraints
Date Wed, 11 Jan 2017 17:11:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 75469a3b6 -> a95170f82


http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 42bb13a..24cb82c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -32,9 +32,9 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
 
     private final KTableImpl<K, ?, V> parent;
-    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
+    private final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper;
 
-    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
@@ -82,8 +82,8 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
                 throw new StreamsException("Record key for the grouping KTable should not be null.");
 
             // if the value is null, we do not need to forward its selected key-value further
-            KeyValue<K1, V1> newPair = change.newValue == null ? null : mapper.apply(key, change.newValue);
-            KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
+            KeyValue<? extends K1, ? extends V1> newPair = change.newValue == null ? null : mapper.apply(key, change.newValue);
+            KeyValue<? extends K1, ? extends V1> oldPair = change.oldValue == null ? null : mapper.apply(key, change.oldValue);
 
             // if the selected repartition key or value is null, skip
             // forward oldPair first, to be consistent with reduce and aggregate

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index b055d52..211ed64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -507,7 +507,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
-    public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+    public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
         if (nodeFactories.containsKey(name))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 8b0dcdf..ec20d46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -27,7 +27,7 @@ public interface RecordCollector {
     <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
 
     <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                     StreamPartitioner<K, V> partitioner);
+                     StreamPartitioner<? super K, ? super V> partitioner);
 
     void flush();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d733e66..afb8999 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -60,7 +60,7 @@ public class RecordCollectorImpl implements RecordCollector {
 
     @Override
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamPartitioner<K, V> partitioner) {
+                            StreamPartitioner<? super K, ? super V> partitioner) {
         checkForException();
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index a660b15..b9367ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -41,7 +41,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         }
 
         @Override
-        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) {
 
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index ccb2cda..5494674 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -146,7 +146,7 @@ public class StreamsMetadataState {
      */
     public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
                                                                final K key,
-                                                               final StreamPartitioner<K, ?> partitioner) {
+                                                               final StreamPartitioner<? super K, ?> partitioner) {
         Objects.requireNonNull(storeName, "storeName can't be null");
         Objects.requireNonNull(key, "key can't be null");
         Objects.requireNonNull(partitioner, "partitioner can't be null");
@@ -204,7 +204,7 @@ public class StreamsMetadataState {
 
     private <K> StreamsMetadata getStreamsMetadataForKey(final String storeName,
                                                          final K key,
-                                                         final StreamPartitioner<K, ?> partitioner,
+                                                         final StreamPartitioner<? super K, ?> partitioner,
                                                          final SourceTopicsInfo sourceTopicsInfo) {
 
         final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index fe86874..33354ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -58,10 +59,12 @@ import static org.hamcrest.core.Is.is;
 
 @RunWith(Parameterized.class)
 public class KStreamRepartitionJoinTest {
+
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = MockValueJoiner.instance(":");
     private final MockTime mockTime = CLUSTER.time;
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
@@ -70,7 +73,6 @@ public class KStreamRepartitionJoinTest {
     private KStream<Long, Integer> streamOne;
     private KStream<Integer, String> streamTwo;
     private KStream<Integer, String> streamFour;
-    private ValueJoiner<Integer, String, String> valueJoiner;
     private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
 
     private final List<String>
@@ -109,13 +111,6 @@ public class KStreamRepartitionJoinTest {
         streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
         streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
 
-        valueJoiner = new ValueJoiner<Integer, String, String>() {
-            @Override
-            public String apply(final Integer value1, final String value2) {
-                return value1 + ":" + value2;
-            }
-        };
-
         keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
     }
 
@@ -213,18 +208,11 @@ public class KStreamRepartitionJoinTest {
 
     private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
 
-        final ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
-            @Override
-            public String apply(final String value1, final Integer value2) {
-                return value1 + ":" + value2;
-            }
-        };
-
         final String output = "join-rhs-stream-mapped-" + testNo;
         CLUSTER.createTopic(output);
         streamTwo
             .join(streamOne.map(keyMapper),
-                joiner,
+                TOSTRING_JOINER,
                 getJoinWindow(),
                 Serdes.Integer(),
                 Serdes.String(),
@@ -243,7 +231,7 @@ public class KStreamRepartitionJoinTest {
         final String outputTopic = "left-join-" + testNo;
         CLUSTER.createTopic(outputTopic);
         map1.leftJoin(map2,
-            valueJoiner,
+            TOSTRING_JOINER,
             getJoinWindow(),
             Serdes.Integer(),
             Serdes.Integer(),
@@ -262,24 +250,17 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
 
         final KStream<Integer, String> join = map1.join(map2,
-            valueJoiner,
+            TOSTRING_JOINER,
             getJoinWindow(),
             Serdes.Integer(),
             Serdes.Integer(),
             Serdes.String());
 
-        final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + ":" + value2;
-            }
-        };
-
         final String topic = "map-join-join-" + testNo;
         CLUSTER.createTopic(topic);
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
-                joiner,
+                TOSTRING_JOINER,
                 getJoinWindow(),
                 Serdes.Integer(),
                 Serdes.String(),
@@ -418,7 +399,7 @@ public class KStreamRepartitionJoinTest {
                         final String outputTopic) {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
-            valueJoiner,
+            TOSTRING_JOINER,
             getJoinWindow(),
             Serdes.Integer(),
             Serdes.Integer(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 52decf4..a154744 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -164,7 +164,7 @@ public class KStreamBuilderTest {
         assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
-        mapped.leftJoin(table, MockValueJoiner.STRING_JOINER).groupByKey().count("count");
+        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 82e8c6c..62dd1d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -92,7 +92,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
-        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, Serdes.String(), "store");
+        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
@@ -103,12 +103,12 @@ public class KGroupedStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
         String storeName = null;
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), storeName);
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), storeName);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
-        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
+        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
@@ -118,19 +118,19 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, Serdes.String(), "store");
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
         String storeName = null;
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName);
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
         StateStoreSupplier storeSupplier = null;
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), storeSupplier);
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), storeSupplier);
     }
 
     @Test
@@ -265,7 +265,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
-        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+        groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
                 return null;
@@ -286,7 +286,7 @@ public class KGroupedStreamImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT,
-                                MockAggregator.STRING_ADDER,
+                                MockAggregator.TOSTRING_ADDER,
                                 null,
                                 SessionWindows.with(10),
                                 Serdes.String(),
@@ -295,7 +295,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
                 return null;
@@ -305,7 +305,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
                 return null;
@@ -315,7 +315,7 @@ public class KGroupedStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
-        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger<String, String>() {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
                 return null;
@@ -337,4 +337,4 @@ public class KGroupedStreamImplTest {
     public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws Exception {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 85e2073..5ed61e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -18,10 +18,12 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
@@ -52,22 +54,22 @@ public class KGroupedTableImplTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
         String storeName = null;
-        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, storeName);
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, storeName);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
-        groupedTable.aggregate(null, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "store");
+        groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "store");
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullAdderOnAggregate() throws Exception {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.STRING_REMOVER, "store");
+        groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, "store");
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
-        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, "store");
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, "store");
     }
 
     @Test(expected = NullPointerException.class)
@@ -96,9 +98,17 @@ public class KGroupedTableImplTest {
     public void shouldReduce() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         final String topic = "input";
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Integer(), topic, "store")
-                .groupBy(MockKeyValueMapper.<String, Integer>NoOpKeyValueMapper())
-                .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
+        final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
+            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(String key, Number value) {
+                    return KeyValue.pair(key, value.intValue());
+                }
+            };
+
+        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+            .groupBy(intProjection)
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
 
         final Map<String, Integer> results = new HashMap<>();
         reduced.foreach(new ForeachAction<String, Integer>() {
@@ -111,21 +121,21 @@ public class KGroupedTableImplTest {
 
         final KStreamTestDriver driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
         driver.setTime(10L);
-        driver.process(topic, "A", 1);
-        driver.process(topic, "B", 2);
+        driver.process(topic, "A", 1.1);
+        driver.process(topic, "B", 2.2);
         driver.flushState();
 
         assertEquals(Integer.valueOf(1), results.get("A"));
         assertEquals(Integer.valueOf(2), results.get("B"));
 
-        driver.process(topic, "A", 2);
-        driver.process(topic, "B", 1);
-        driver.process(topic, "A", 5);
-        driver.process(topic, "B", 6);
+        driver.process(topic, "A", 2.6);
+        driver.process(topic, "B", 1.3);
+        driver.process(topic, "A", 5.7);
+        driver.process(topic, "B", 6.2);
         driver.flushState();
 
         assertEquals(Integer.valueOf(5), results.get("A"));
         assertEquals(Integer.valueOf(6), results.get("B"));
 
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 4e396ba..7e828ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -95,4 +95,26 @@ public class KStreamBranchTest {
         assertEquals(1, processors[1].processed.size());
         assertEquals(2, processors[2].processed.size());
     }
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        Predicate<Number, Object> positive = new Predicate<Number, Object>() {
+            @Override
+            public boolean test(Number key, Object value) {
+                return key.doubleValue() > 0;
+            }
+        };
+
+        Predicate<Number, Object> negative = new Predicate<Number, Object>() {
+            @Override
+            public boolean test(Number key, Object value) {
+                return key.doubleValue() < 0;
+            }
+        };
+
+        @SuppressWarnings("unchecked")
+        final KStream<Integer, String>[] branches = new KStreamBuilder()
+            .<Integer, String>stream("empty")
+            .branch(positive, negative);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 962df07..347b05b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
-
 import org.junit.After;
 import org.junit.Test;
 
@@ -89,4 +88,20 @@ public class KStreamFilterTest {
 
         assertEquals(5, processor.processed.size());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
+            @Override
+            public boolean test(Number key, Object value) {
+                return false;
+            }
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>stream("empty")
+            .filter(numberKeyPredicate)
+            .filterNot(numberKeyPredicate)
+            .to("nirvana");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 724b08b..41e0c5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -49,13 +49,13 @@ public class KStreamFlatMapTest {
     public void testFlatMap() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>> mapper =
-            new KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>>() {
+        KeyValueMapper<Number, Object, Iterable<KeyValue<String, String>>> mapper =
+            new KeyValueMapper<Number, Object, Iterable<KeyValue<String, String>>>() {
                 @Override
-                public Iterable<KeyValue<String, String>> apply(Integer key, String value) {
+                public Iterable<KeyValue<String, String>> apply(Number key, Object value) {
                     ArrayList<KeyValue<String, String>> result = new ArrayList<>();
-                    for (int i = 0; i < key; i++) {
-                        result.add(KeyValue.pair(Integer.toString(key * 10 + i), value));
+                    for (int i = 0; i < key.intValue(); i++) {
+                        result.add(KeyValue.pair(Integer.toString(key.intValue() * 10 + i), value.toString()));
                     }
                     return result;
                 }
@@ -84,4 +84,4 @@ public class KStreamFlatMapTest {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index ff4a581..f4dd08a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -27,7 +27,6 @@ import org.junit.After;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
 
@@ -49,29 +48,29 @@ public class KStreamFlatMapValuesTest {
     public void testFlatMapValues() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        ValueMapper<String, Iterable<String>> mapper =
-            new ValueMapper<String, Iterable<String>>() {
+        ValueMapper<Number, Iterable<String>> mapper =
+            new ValueMapper<Number, Iterable<String>>() {
                 @Override
-                public Iterable<String> apply(String value) {
+                public Iterable<String> apply(Number value) {
                     ArrayList<String> result = new ArrayList<String>();
-                    result.add(value.toLowerCase(Locale.ROOT));
-                    result.add(value);
+                    result.add("v" + value);
+                    result.add("V" + value);
                     return result;
                 }
             };
 
         final int[] expectedKeys = {0, 1, 2, 3};
 
-        KStream<Integer, String> stream;
+        KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.Integer(), topicName);
         stream.flatMapValues(mapper).process(processor);
 
         driver = new KStreamTestDriver(builder);
         for (int expectedKey : expectedKeys) {
-            driver.process(topicName, expectedKey, "V" + expectedKey);
+            driver.process(topicName, expectedKey, expectedKey);
         }
 
         assertEquals(8, processor.processed.size());
@@ -82,4 +81,4 @@ public class KStreamFlatMapValuesTest {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index 0bc5e77..0597dc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -94,4 +94,16 @@ public class KStreamForeachTest {
             assertEquals(expectedRecord, actualRecord);
         }
     }
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
+            @Override
+            public void apply(Number key, Object value) {}
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>stream("emptyTopic")
+            .foreach(consume);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 8c96ecb..5271404 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -236,7 +236,7 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullOtherStreamOnJoin() throws Exception {
-        testStream.join(null, MockValueJoiner.STRING_JOINER, JoinWindows.of(10));
+        testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10));
     }
 
     @Test(expected = NullPointerException.class)
@@ -246,12 +246,12 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinWindowsOnJoin() throws Exception {
-        testStream.join(testStream, MockValueJoiner.STRING_JOINER, null);
+        testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTableOnTableJoin() throws Exception {
-        testStream.leftJoin((KTable) null, MockValueJoiner.STRING_JOINER);
+        testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 7432f1d..e11da4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -79,7 +79,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -177,7 +177,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -278,7 +278,7 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -505,7 +505,7 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -614,7 +614,7 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 890d8f6..b3ba4b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -81,7 +81,7 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -171,7 +171,7 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index b8142fb..94f68dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -76,7 +76,7 @@ public class KStreamKTableJoinTest {
         processor = new MockProcessorSupplier<>();
         stream = builder.stream(intSerde, stringSerde, topic1);
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
-        stream.join(table, MockValueJoiner.STRING_JOINER).process(processor);
+        stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 94e7888..569ea5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -76,7 +76,7 @@ public class KStreamKTableLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream = builder.stream(intSerde, stringSerde, topic1);
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
-        stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
+        stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index d843f93..f6d1b8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -80,4 +80,19 @@ public class KStreamMapTest {
             assertEquals(expected[i], processor.processed.get(i));
         }
     }
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        KeyValueMapper<Number, Object, KeyValue<Number, String>> stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() {
+            @Override
+            public KeyValue<Number, String> apply(Number key, Object value) {
+                return KeyValue.pair(key, key + ":" + value);
+            }
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>stream("numbers")
+            .map(stringify)
+            .to("strings");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 802b019..dd06199 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -50,10 +50,10 @@ public class KStreamMapValuesTest {
     public void testFlatMapValues() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        ValueMapper<String, Integer> mapper =
-            new ValueMapper<String, Integer>() {
+        ValueMapper<CharSequence, Integer> mapper =
+            new ValueMapper<CharSequence, Integer>() {
                 @Override
-                public Integer apply(String value) {
+                public Integer apply(CharSequence value) {
                     return value.length();
                 }
             };

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 1bd870e..94daa3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -21,6 +21,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -55,15 +56,15 @@ public class KStreamSelectKeyTest {
     public void testSelectKey() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        final Map<Integer, String> keyMap = new HashMap<>();
+        final Map<Number, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");
         keyMap.put(2, "TWO");
         keyMap.put(3, "THREE");
 
 
-        KeyValueMapper<String, Integer, String> selector = new KeyValueMapper<String, Integer, String>() {
+        KeyValueMapper<Object, Number, String> selector = new KeyValueMapper<Object, Number, String>() {
             @Override
-            public String apply(String key, Integer value) {
+            public String apply(Object key, Number value) {
                 return keyMap.get(value);
             }
         };
@@ -91,4 +92,15 @@ public class KStreamSelectKeyTest {
 
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testTypeVariance() throws Exception {
+        ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
+            @Override
+            public void apply(Number key, Object value) {}
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>stream("empty")
+            .foreach(consume);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index cbb2764..2d4a530 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -52,10 +52,10 @@ public class KStreamTransformTest {
     public void testTransform() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>> transformerSupplier =
-            new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
-                public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
-                    return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
+        TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
+            new TransformerSupplier<Number, Number, KeyValue<Integer, Integer>>() {
+                public Transformer<Number, Number, KeyValue<Integer, Integer>> get() {
+                    return new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
 
                         private int total = 0;
 
@@ -64,9 +64,9 @@ public class KStreamTransformTest {
                         }
 
                         @Override
-                        public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
-                            total += value;
-                            return KeyValue.pair(key * 2, total);
+                        public KeyValue<Integer, Integer> transform(Number key, Number value) {
+                            total += value.intValue();
+                            return KeyValue.pair(key.intValue() * 2, total);
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 600a0b4..557388d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -51,10 +51,10 @@ public class KStreamTransformValuesTest {
     public void testTransform() {
         KStreamBuilder builder = new KStreamBuilder();
 
-        ValueTransformerSupplier<Integer, Integer> valueTransformerSupplier =
-            new ValueTransformerSupplier<Integer, Integer>() {
-                public ValueTransformer<Integer, Integer> get() {
-                    return new ValueTransformer<Integer, Integer>() {
+        ValueTransformerSupplier<Number, Integer> valueTransformerSupplier =
+            new ValueTransformerSupplier<Number, Integer>() {
+                public ValueTransformer<Number, Integer> get() {
+                    return new ValueTransformer<Number, Integer>() {
 
                         private int total = 0;
 
@@ -63,8 +63,8 @@ public class KStreamTransformValuesTest {
                         }
 
                         @Override
-                        public Integer transform(Integer value) {
-                            total += value;
+                        public Integer transform(Number value) {
+                            total += value.intValue();
                             return total;
                         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 2175dd5..960ef16 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -76,7 +76,7 @@ public class KStreamWindowAggregateTest {
                 stream1.groupByKey(strSerde,
                                    strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.STRING_ADDER,
+                               MockAggregator.TOSTRING_ADDER,
                                TimeWindows.of(10).advanceBy(5),
                                strSerde, "topic1-Canonized");
 
@@ -175,7 +175,7 @@ public class KStreamWindowAggregateTest {
             KTable<Windowed<String>, String> table1 =
                 stream1.groupByKey(strSerde, strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.STRING_ADDER,
+                               MockAggregator.TOSTRING_ADDER,
                                TimeWindows.of(10).advanceBy(5),
                                strSerde, "topic1-Canonized");
 
@@ -186,7 +186,7 @@ public class KStreamWindowAggregateTest {
             KTable<Windowed<String>, String> table2 =
                 stream2.groupByKey(strSerde, strSerde)
                     .aggregate(MockInitializer.STRING_INIT,
-                               MockAggregator.STRING_ADDER,
+                               MockAggregator.TOSTRING_ADDER,
                                TimeWindows.of(10).advanceBy(5),
                                strSerde, "topic2-Canonized");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index d8ec89b..a43cab6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -79,8 +79,8 @@ public class KTableAggregateTest {
                 stringSerde,
                 stringSerde
         ).aggregate(MockInitializer.STRING_INIT,
-                MockAggregator.STRING_ADDER,
-                MockAggregator.STRING_REMOVER,
+                MockAggregator.TOSTRING_ADDER,
+                MockAggregator.TOSTRING_REMOVER,
                 stringSerde,
                 "topic1-Canonized");
 
@@ -128,8 +128,8 @@ public class KTableAggregateTest {
             stringSerde,
             stringSerde
         ).aggregate(MockInitializer.STRING_INIT,
-            MockAggregator.STRING_ADDER,
-            MockAggregator.STRING_REMOVER,
+            MockAggregator.TOSTRING_ADDER,
+            MockAggregator.TOSTRING_REMOVER,
             stringSerde,
             "topic1-Canonized");
 
@@ -170,8 +170,8 @@ public class KTableAggregateTest {
                 stringSerde
         )
                 .aggregate(MockInitializer.STRING_INIT,
-                MockAggregator.STRING_ADDER,
-                MockAggregator.STRING_REMOVER,
+                MockAggregator.TOSTRING_ADDER,
+                MockAggregator.TOSTRING_REMOVER,
                 stringSerde,
                 "topic1-Canonized");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 277d6d2..9746d7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -313,4 +313,20 @@ public class KTableFilterTest {
         proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
         proc2.checkEmptyAndClearProcessResult();
     }
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        Predicate<Number, Object> numberKeyPredicate = new Predicate<Number, Object>() {
+            @Override
+            public boolean test(Number key, Object value) {
+                return false;
+            }
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>table("empty", "emptyStore")
+            .filter(numberKeyPredicate)
+            .filterNot(numberKeyPredicate)
+            .to("nirvana");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 791fa28..f2a16ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -105,4 +105,16 @@ public class KTableForeachTest {
             assertEquals(expectedRecord, actualRecord);
         }
     }
+
+    @Test
+    public void testTypeVariance() throws Exception {
+        ForeachAction<Number, Object> consume = new ForeachAction<Number, Object>() {
+            @Override
+            public void apply(Number key, Object value) {}
+        };
+
+        new KStreamBuilder()
+            .<Integer, String>table("emptyTopic", "emptyStore")
+            .foreach(consume);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 49dcbd0..37ebdb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -349,7 +349,7 @@ public class KTableImplTest {
 
         KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
                 .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
-                .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
+                .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
 
 
         KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
@@ -436,7 +436,7 @@ public class KTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullOtherTableOnJoin() throws Exception {
-        table.join(null, MockValueJoiner.STRING_JOINER);
+        table.join(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
@@ -446,7 +446,7 @@ public class KTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullOtherTableOnOuterJoin() throws Exception {
-        table.outerJoin(null, MockValueJoiner.STRING_JOINER);
+        table.outerJoin(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
@@ -461,7 +461,7 @@ public class KTableImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullOtherTableOnLeftJoin() throws Exception {
-        table.leftJoin(null, MockValueJoiner.STRING_JOINER);
+        table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 05cc785..2eb522c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -82,7 +82,7 @@ public class KTableKTableJoinTest {
         processor = new MockProcessorSupplier<>();
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
         final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -180,7 +180,7 @@ public class KTableKTableJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -264,7 +264,7 @@ public class KTableKTableJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 551fa1e..cbf1da4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -81,7 +81,7 @@ public class KTableKTableLeftJoinTest {
 
         KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
+        KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         MockProcessorSupplier<Integer, String> processor;
         processor = new MockProcessorSupplier<>();
         joined.toStream().process(processor);
@@ -176,7 +176,7 @@ public class KTableKTableLeftJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -257,7 +257,7 @@ public class KTableKTableLeftJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.leftJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
@@ -368,14 +368,14 @@ public class KTableKTableLeftJoinTest {
         final KTable<Long, String> seven = one.mapValues(mapper);
 
 
-        final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.STRING_JOINER);
+        final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.TOSTRING_JOINER);
 
-        aggTable.leftJoin(one, MockValueJoiner.STRING_JOINER)
-                .leftJoin(two, MockValueJoiner.STRING_JOINER)
-                .leftJoin(three, MockValueJoiner.STRING_JOINER)
-                .leftJoin(four, MockValueJoiner.STRING_JOINER)
-                .leftJoin(five, MockValueJoiner.STRING_JOINER)
-                .leftJoin(eight, MockValueJoiner.STRING_JOINER)
+        aggTable.leftJoin(one, MockValueJoiner.TOSTRING_JOINER)
+                .leftJoin(two, MockValueJoiner.TOSTRING_JOINER)
+                .leftJoin(three, MockValueJoiner.TOSTRING_JOINER)
+                .leftJoin(four, MockValueJoiner.TOSTRING_JOINER)
+                .leftJoin(five, MockValueJoiner.TOSTRING_JOINER)
+                .leftJoin(eight, MockValueJoiner.TOSTRING_JOINER)
                 .mapValues(mapper);
 
         final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, 250);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 8673240..02e4e60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -82,7 +82,7 @@ public class KTableKTableOuterJoinTest {
         processor = new MockProcessorSupplier<>();
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -182,7 +182,7 @@ public class KTableKTableOuterJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
         builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -270,7 +270,7 @@ public class KTableKTableOuterJoinTest {
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
-        joined = table1.outerJoin(table2, MockValueJoiner.STRING_JOINER);
+        joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 1d5b8a9..096f1c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -66,10 +66,10 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
-        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
+        KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
-            public Integer apply(String value) {
-                return new Integer(value);
+            public Integer apply(CharSequence value) {
+                return value.charAt(0) - 48;
             }
         });
 
@@ -78,10 +78,10 @@ public class KTableMapValuesTest {
 
         driver = new KStreamTestDriver(builder, stateDir);
 
-        driver.process(topic1, "A", "01");
-        driver.process(topic1, "B", "02");
-        driver.process(topic1, "C", "03");
-        driver.process(topic1, "D", "04");
+        driver.process(topic1, "A", "1");
+        driver.process(topic1, "B", "2");
+        driver.process(topic1, "C", "3");
+        driver.process(topic1, "D", "4");
         driver.flushState();
         assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d907506..3301744 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -231,10 +231,10 @@ public class ProcessorTopologyTest {
         assertNull(driver.readOutput(topic));
     }
 
-    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
-        return new StreamPartitioner<K, V>() {
+    protected StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) {
+        return new StreamPartitioner<Object, Object>() {
             @Override
-            public Integer partition(K key, V value, int numPartitions) {
+            public Integer partition(Object key, Object value, int numPartitions) {
                 return partition;
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 03256eb..b54ce34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -60,9 +60,9 @@ public class RecordCollectorTest {
     private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
     private final StringSerializer stringSerializer = new StringSerializer();
 
-    private final StreamPartitioner<String, String> streamPartitioner = new StreamPartitioner<String, String>() {
+    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
         @Override
-        public Integer partition(String key, String value, int numPartitions) {
+        public Integer partition(String key, Object value, int numPartitions) {
             return Integer.parseInt(key) % numPartitions;
         }
     };

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 0e36c45..591e866 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -101,7 +101,7 @@ public class SmokeTestClient extends SmokeTestUtil {
             }
         });
 
-        data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+        data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
         KGroupedStream<String, Integer>
@@ -127,7 +127,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "min");
 
         KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
-        minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
         groupedData.aggregate(
@@ -149,7 +149,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "max");
 
         KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
-        maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
         groupedData.aggregate(
@@ -172,7 +172,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
 
         KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
-        sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
 
         // cnt
         groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
@@ -181,7 +181,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, longSerde, "cnt");
 
         KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
-        cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif
         maxTable.join(minTable,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 7ff738f..36660fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -37,14 +37,14 @@ public class SmokeTestUtil {
     public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
     public final static int END = Integer.MAX_VALUE;
 
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
+    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, false);
     }
 
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
-        return new ProcessorSupplier<String, T>() {
-            public Processor<String, T> get() {
-                return new AbstractProcessor<String, T>() {
+    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
+        return new ProcessorSupplier<Object, Object>() {
+            public Processor<Object, Object> get() {
+                return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
                     private ProcessorContext context;
 
@@ -56,7 +56,7 @@ public class SmokeTestUtil {
                     }
 
                     @Override
-                    public void process(String key, T value) {
+                    public void process(Object key, Object value) {
                         if (printOffset) System.out.println(">>> " + context.offset());
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 68a80d1..69474b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -213,7 +213,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
-                                    StreamPartitioner<K1, V1> partitioner) {
+                                    StreamPartitioner<? super K1, ? super V1> partitioner) {
                 // ignore partitioner
                 send(record, keySerializer, valueSerializer);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 94b5794..82e524e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -51,7 +51,7 @@ public class StoreChangeLoggerTest {
 
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
-                                          StreamPartitioner<K1, V1> partitioner) {
+                                          StreamPartitioner<? super K1, ? super V1> partitioner) {
                     // ignore partitioner
                     send(record, keySerializer, valueSerializer);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index f51cc0e..c12c612 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -253,7 +253,7 @@ public class KStreamTestDriver {
 
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                                StreamPartitioner<K, V> partitioner) {
+                                StreamPartitioner<? super K, ? super V> partitioner) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
index e8bb10b..d8890f6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
@@ -21,23 +21,15 @@ import org.apache.kafka.streams.kstream.Aggregator;
 
 public class MockAggregator {
 
-    private static class StringAdd implements Aggregator<String, String, String> {
-
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "+" + value;
-        }
-    }
-
-    private static class StringRemove implements Aggregator<String, String, String> {
-
-        @Override
-        public String apply(String aggKey, String value, String aggregate) {
-            return aggregate + "-" + value;
-        }
+    public final static Aggregator<Object, Object, String> TOSTRING_ADDER = toStringInstance("+");
+    public final static Aggregator<Object, Object, String> TOSTRING_REMOVER = toStringInstance("-");
+
+    public static <K, V> Aggregator<K, V, String> toStringInstance(final String sep) {
+        return new Aggregator<K, V, String>() {
+            @Override
+            public String apply(K aggKey, V value, String aggregate) {
+                return aggregate + sep + value;
+            }
+        };
     }
-
-    public final static Aggregator<String, String, String> STRING_ADDER = new StringAdd();
-
-    public final static Aggregator<String, String, String> STRING_REMOVER = new StringRemove();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
index 4d44166..4d2dd69 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java
@@ -21,13 +21,14 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 
 public class MockValueJoiner {
 
-    private static class StringJoin implements ValueJoiner<String, String, String> {
+    public final static ValueJoiner<Object, Object, String> TOSTRING_JOINER = instance("+");
 
-        @Override
-        public String apply(String value1, String value2) {
-            return value1 + "+" + value2;
-        }
-    };
-
-    public final static ValueJoiner<String, String, String> STRING_JOINER = new StringJoin();
-}
\ No newline at end of file
+    public static <V1, V2> ValueJoiner<V1, V2, String> instance(final String separator) {
+        return new ValueJoiner<V1, V2, String>() {
+            @Override
+            public String apply(V1 value1, V2 value2) {
+                return value1 + separator + value2;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a95170f8/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index c32ed08..0fe4d63 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -33,7 +33,7 @@ public class NoOpRecordCollector implements RecordCollector {
     }
 
     @Override
-    public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+    public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) {
         // no-op
     }
 


Mime
View raw message