kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7456: Serde Inheritance in DSL (#5521)
Date Mon, 01 Oct 2018 23:25:10 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d568f73  KAFKA-7456: Serde Inheritance in DSL (#5521)
d568f73 is described below

commit d568f73fc6ece3d29989413174eee0195d3d0a4a
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Oct 1 16:24:12 2018 -0700

    KAFKA-7456: Serde Inheritance in DSL (#5521)
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/streams/kstream/WindowedSerdes.java      |   4 +-
 .../streams/kstream/internals/AbstractStream.java  |  44 ++-
 .../internals/GroupedStreamAggregateBuilder.java   |  19 +-
 .../kstream/internals/InternalStreamsBuilder.java  |  33 ++-
 .../kstream/internals/KGroupedStreamImpl.java      |  29 +-
 .../kstream/internals/KGroupedTableImpl.java       |  48 ++--
 .../streams/kstream/internals/KStreamImpl.java     | 303 ++++++++++-----------
 .../streams/kstream/internals/KTableImpl.java      | 148 +++++-----
 .../kstream/internals/KTableKTableJoinMerger.java  |  14 +-
 .../internals/SessionWindowedKStreamImpl.java      |  46 ++--
 .../kstream/internals/TimeWindowedKStreamImpl.java |  40 +--
 .../GroupedTableOperationRepartitionNode.java      |   5 +-
 .../internals/graph/KTableKTableJoinNode.java      |   2 +-
 .../internals/graph/ProcessorParameters.java       |   6 +-
 .../kafka/streams/kstream/WindowedSerdesTest.java  |  23 +-
 .../kstream/internals/AbstractStreamTest.java      |  11 +-
 .../streams/kstream/internals/KStreamImplTest.java | 123 +++++++++
 .../streams/kstream/internals/KStreamMapTest.java  |  21 +-
 .../streams/kstream/internals/KTableImplTest.java  |  77 ++++++
 .../kstream/internals/graph/StreamsGraphTest.java  |   4 +-
 20 files changed, 594 insertions(+), 406 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 6a851a1..2474860 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -24,7 +24,7 @@ public class WindowedSerdes {
     static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
         // Default constructor needed for reflection object creation
         public TimeWindowedSerde() {
-            super(new TimeWindowedSerializer<T>(), new TimeWindowedDeserializer<T>());
+            super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>());
         }
 
         public TimeWindowedSerde(final Serde<T> inner) {
@@ -35,7 +35,7 @@ public class WindowedSerdes {
     static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
         // Default constructor needed for reflection object creation
         public SessionWindowedSerde() {
-            super(new SessionWindowedSerializer<T>(), new SessionWindowedDeserializer<T>());
+            super(new SessionWindowedSerializer<>(), new SessionWindowedDeserializer<>());
         }
 
         public SessionWindowedSerde(final Serde<T> inner) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index a0724eb..e870751 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -31,32 +32,48 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 
-public abstract class AbstractStream<K> {
+/*
+ * Any classes (KTable, KStream, etc) extending this class should follow the serde specification precedence ordering as:
+ *
+ * 1) Overridden values via control objects (e.g. Materialized, Serialized, Consumed, etc)
+ * 2) Serdes that can be inferred from the operator itself (e.g. groupBy().count(), where value serde can default to `LongSerde`).
+ * 3) Serde inherited from parent operator if possible (note if the key / value types have been changed, then the corresponding serde cannot be inherited).
+ * 4) Default serde specified in the config.
+ */
+public abstract class AbstractStream<K, V> {
 
-    protected final InternalStreamsBuilder builder;
     protected final String name;
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valSerde;
     protected final Set<String> sourceNodes;
     protected final StreamsGraphNode streamsGraphNode;
+    protected final InternalStreamsBuilder builder;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
-    public AbstractStream(final AbstractStream<K> stream) {
-        this.builder = stream.builder;
+    public AbstractStream(final AbstractStream<K, V> stream) {
         this.name = stream.name;
+        this.builder = stream.builder;
+        this.keySerde = stream.keySerde;
+        this.valSerde = stream.valSerde;
         this.sourceNodes = stream.sourceNodes;
         this.streamsGraphNode = stream.streamsGraphNode;
     }
 
-    AbstractStream(final InternalStreamsBuilder builder,
-                   final String name,
+    AbstractStream(final String name,
+                   final Serde<K> keySerde,
+                   final Serde<V> valSerde,
                    final Set<String> sourceNodes,
-                   final StreamsGraphNode streamsGraphNode) {
+                   final StreamsGraphNode streamsGraphNode,
+                   final InternalStreamsBuilder builder) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
         }
 
-        this.builder = builder;
         this.name = name;
+        this.builder = builder;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
         this.sourceNodes = sourceNodes;
         this.streamsGraphNode = streamsGraphNode;
     }
@@ -67,7 +84,7 @@ public abstract class AbstractStream<K> {
         return builder.internalTopologyBuilder;
     }
 
-    Set<String> ensureJoinableWith(final AbstractStream<K> other) {
+    Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) {
         final Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(other.sourceNodes);
@@ -122,4 +139,13 @@ public abstract class AbstractStream<K> {
             }
         };
     }
+
+    // for testing only
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valSerde;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index a191c5a..9791db6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -63,11 +63,12 @@ class GroupedStreamAggregateBuilder<K, V> {
         this.streamsGraphNode = streamsGraphNode;
     }
 
-
-    <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
-                                final String functionName,
+    <KR, T> KTable<KR, T> build(final String functionName,
                                 final StoreBuilder<? extends StateStore> storeBuilder,
-                                final boolean isQueryable) {
+                                final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
+                                final boolean isQueryable,
+                                final Serde<KR> keySerde,
+                                final Serde<T> valSerde) {
 
         final String aggFunctionName = builder.newProcessorName(functionName);
 
@@ -95,13 +96,15 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
 
-        return new KTableImpl<>(builder,
-                                aggFunctionName,
-                                aggregateSupplier,
+        return new KTableImpl<>(aggFunctionName,
+                                keySerde,
+                                valSerde,
                                 sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
                                 storeBuilder.name(),
                                 isQueryable,
-                                statefulProcessorNode);
+                                aggregateSupplier,
+                                statefulProcessorNode,
+                                builder);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 49f49d0..8f76740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -79,30 +79,33 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     public <K, V> KStream<K, V> stream(final Collection<String> topics,
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed);
 
-        final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name,
-                                                                              topics,
-                                                                              consumed);
         addGraphNode(root, streamSourceNode);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), false, streamSourceNode);
+        return new KStreamImpl<>(name,
+                                 consumed.keySerde(),
+                                 consumed.valueSerde(),
+                                 Collections.singleton(name),
+                                 false,
+                                 streamSourceNode,
+                                 this);
     }
 
     public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
-
-        final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name,
-                                                                                      topicPattern,
-                                                                                      consumed);
+        final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
 
         addGraphNode(root, streamPatternSourceNode);
 
-        return new KStreamImpl<>(this,
-                                 name,
+        return new KStreamImpl<>(name,
+                                 consumed.keySerde(),
+                                 consumed.valueSerde(),
                                  Collections.singleton(name),
                                  false,
-                                 streamPatternSourceNode);
+                                 streamPatternSourceNode,
+                                 this);
     }
 
     @SuppressWarnings("unchecked")
@@ -129,15 +132,15 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
         addGraphNode(root, tableSourceNode);
 
-        return new KTableImpl<>(this,
-                                name,
-                                processorSupplier,
+        return new KTableImpl<>(name,
                                 consumed.keySerde(),
                                 consumed.valueSerde(),
                                 Collections.singleton(source),
                                 storeBuilder.name(),
                                 materialized.isQueryable(),
-                                tableSourceNode);
+                                processorSupplier,
+                                tableSourceNode,
+                                this);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5d4f9f3..da2eeb6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -36,24 +36,22 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import java.util.Objects;
 import java.util.Set;
 
-class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
+class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedStream<K, V> {
 
     static final String REDUCE_NAME = "KSTREAM-REDUCE-";
     static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final boolean repartitionRequired;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
-    KGroupedStreamImpl(final InternalStreamsBuilder builder,
-                       final String name,
-                       final Set<String> sourceNodes,
+    KGroupedStreamImpl(final String name,
                        final Serde<K> keySerde,
                        final Serde<V> valSerde,
+                       final Set<String> sourceNodes,
                        final boolean repartitionRequired,
-                       final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                       final StreamsGraphNode streamsGraphNode,
+                       final InternalStreamsBuilder builder) {
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
             builder,
             keySerde,
@@ -63,8 +61,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
             name,
             streamsGraphNode
         );
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
     }
 
@@ -189,14 +185,15 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         );
     }
 
-    private <KR, T> KTable<KR, T> doAggregate(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
-                                              final String functionName,
-                                              final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier,
+                                         final String functionName,
+                                         final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
         return aggregateBuilder.build(
-            aggregateSupplier,
             functionName,
             new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
-            materializedInternal.isQueryable()
-        );
+            aggregateSupplier,
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde(),
+            materializedInternal.valueSerde());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 08fb605..6ec3c0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * The implementation class of {@link KGroupedTable}.
@@ -41,14 +42,12 @@ import java.util.Objects;
  * @param <K> the key type
  * @param <V> the value type
  */
-public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> {
+public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGroupedTable<K, V> {
 
     private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final Serde<K> keySerde;
-    protected final Serde<V> valSerde;
     private final Initializer<Long> countInitializer = () -> 0L;
 
     private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L;
@@ -57,16 +56,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
-                      final String sourceName,
+                      final Set<String> sourceNodes,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde,
                       final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, Collections.singleton(sourceName), streamsGraphNode);
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
     }
 
-    @SuppressWarnings("unchecked")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
@@ -75,9 +71,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         final String funcName = builder.newProcessorName(functionName);
         final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName,
-                                                                       sourceName,
-                                                                       topic);
+        final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, topic);
 
         // the passed in StreamsGraphNode must be the parent of the repartition node
         builder.addGraphNode(this.streamsGraphNode, repartitionNode);
@@ -90,34 +84,34 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         builder.addGraphNode(repartitionNode, statefulProcessorNode);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(builder,
-                                funcName,
-                                aggregateSupplier,
+        return new KTableImpl<>(funcName,
+                                materialized.keySerde(),
+                                materialized.valueSerde(),
                                 Collections.singleton(sourceName),
                                 materialized.storeName(),
                                 materialized.isQueryable(),
-                                statefulProcessorNode);
+                                aggregateSupplier,
+                                statefulProcessorNode,
+                                builder);
     }
 
-    @SuppressWarnings("unchecked")
     private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
                                                                final String functionName,
-                                                               final ProcessorSupplier aggregateSupplier) {
+                                                               final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
 
-        final ProcessorParameters aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
+        final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName);
 
         return StatefulProcessorNode.statefulProcessorNodeBuilder()
             .withNodeName(functionName)
             .withProcessorParameters(aggregateFunctionProcessorParams)
-            .withStoreBuilder(new KeyValueStoreMaterializer(materialized).materialize()).build();
+            .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build();
     }
 
-    @SuppressWarnings("unchecked")
-    private GroupedTableOperationRepartitionNode createRepartitionNode(final String sinkName,
-                                                                       final String sourceName,
-                                                                       final String topic) {
+    private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName,
+                                                                             final String sourceName,
+                                                                             final String topic) {
 
-        return GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder()
+        return GroupedTableOperationRepartitionNode.<K, V>groupedTableOperationNodeBuilder()
             .withRepartitionTopic(topic)
             .withSinkName(sinkName)
             .withSourceName(sourceName)
@@ -151,7 +145,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor) {
-        return reduce(adder, subtractor, Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde));
+        return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
     }
 
     @Override
@@ -176,7 +170,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     @Override
     public KTable<K, Long> count() {
-        return count(Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>with(keySerde, Serdes.Long()));
+        return count(Materialized.with(keySerde, Serdes.Long()));
     }
 
     @Override
@@ -206,7 +200,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> adder,
                                       final Aggregator<? super K, ? super V, T> subtractor) {
-        return aggregate(initializer, adder, subtractor, Materialized.<K, T, KeyValueStore<Bytes, byte[]>>with(keySerde, null));
+        return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null));
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 42c20a5..2a3bc8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 
-public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
+public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K, V> {
 
     static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
@@ -113,12 +113,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private final boolean repartitionRequired;
 
-    KStreamImpl(final InternalStreamsBuilder builder,
-                final String name,
+    KStreamImpl(final String name,
+                final Serde<K> keySerde,
+                final Serde<V> valueSerde,
                 final Set<String> sourceNodes,
                 final boolean repartitionRequired,
-                final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                final StreamsGraphNode streamsGraphNode,
+                final InternalStreamsBuilder builder) {
+        super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, builder);
         this.repartitionRequired = repartitionRequired;
     }
 
@@ -129,12 +131,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
-        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name,
-                                                                                                      processorParameters,
-                                                                                                      repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterProcessorNode);
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 valSerde,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 filterProcessorNode,
+                                 builder);
     }
 
     @Override
@@ -143,46 +149,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String name = builder.newProcessorName(FILTER_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
-
-
-        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name,
-                                                                                                         processorParameters,
-                                                                                                         repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
         builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterNotProcessorNode);
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 valSerde,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 filterNotProcessorNode,
+                                 builder);
     }
 
     @Override
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
-
         final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
 
         selectKeyProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
-        return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), sourceNodes, true, selectKeyProcessorNode);
+
+        // key serde cannot be preserved
+        return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, valSerde, sourceNodes, true, selectKeyProcessorNode, builder);
     }
 
 
     private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         final String name = builder.newProcessorName(KEY_SELECT_NAME);
 
-
-        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>(
-            (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value));
-
+        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
 
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
 
-        return new ProcessorGraphNode<>(
-            name,
-            processorParameters,
-            repartitionRequired
-        );
-
+        return new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
     }
 
     @Override
@@ -192,14 +193,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name);
 
-        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
-                                                                                                   processorParameters,
-                                                                                                   true);
+        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters, true);
 
         mapProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true, mapProcessorNode);
+        // key and value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 null,
+                                 null,
+                                 sourceNodes,
+                                 true,
+                                 mapProcessorNode,
+                                 builder);
     }
 
 
@@ -214,15 +220,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String name = builder.newProcessorName(MAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
+        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
-
-        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
-                                                                                                         processorParameters,
-                                                                                                         repartitionRequired);
         mapValuesProcessorNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode);
+        // value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 null,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 mapValuesProcessorNode,
+                                 builder);
     }
 
     @Override
@@ -232,31 +242,30 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String name = builder.newProcessorName(PRINTING_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
+        final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters, false);
 
-
-        final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
-                                                                                            processorParameters,
-                                                                                            false);
         builder.addGraphNode(this.streamsGraphNode, printNode);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(
-        final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(FLATMAP_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
-
-
-        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
-                                                                                              processorParameters,
-                                                                                              true);
+        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters, true);
         flatMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true, flatMapNode);
+        // key and value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 null,
+                                 null,
+                                 sourceNodes,
+                                 true,
+                                 flatMapNode,
+                                 builder);
     }
 
     @Override
@@ -270,15 +279,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
+        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
-
-        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
-                                                                                                    processorParameters,
-                                                                                                    repartitionRequired);
         flatMapValuesNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode);
+        // value serde cannot be preserved
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, flatMapValuesNode, builder);
     }
 
     @Override
@@ -298,25 +305,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
         }
 
-
         final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
-
-        final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName,
-                                                                             processorParameters,
-                                                                             false);
+        final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, processorParameters, false);
         builder.addGraphNode(this.streamsGraphNode, branchNode);
 
         final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
 
         for (int i = 0; i < predicates.length; i++) {
             final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
-
-            final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i],
-                                                                                      innerProcessorParameters,
-                                                                                      repartitionRequired);
+            final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters, repartitionRequired);
 
             builder.addGraphNode(branchNode, branchChildNode);
-            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired, branchChildNode);
+            branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, sourceNodes, repartitionRequired, branchChildNode, builder);
         }
 
         return branchChildren;
@@ -347,22 +347,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
-        return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode);
-    }
 
-    @Override
-    public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
-        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
-        to(topic, producedInternal);
-        return builder.stream(
-            Collections.singleton(topic),
-            new ConsumedInternal<>(
-                producedInternal.keySerde(),
-                producedInternal.valueSerde(),
-                new FailOnInvalidTimestamp(),
-                null
-            )
-        );
+        // drop the serde as we cannot safely use either one to represent both streams
+        return new KStreamImpl<>(name, null, null, allSourceNodes, requireRepartitioning, mergeNode, builder);
     }
 
     @Override
@@ -375,7 +362,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             name
         );
 
-
         final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
                                                                                               processorParameters,
                                                                                               repartitionRequired);
@@ -394,12 +380,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name,
                                                                                            processorParameters,
-                                                                                           repartitionRequired
-        );
+                                                                                           repartitionRequired);
 
         builder.addGraphNode(this.streamsGraphNode, peekNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired, peekNode);
+        return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, repartitionRequired, peekNode, builder);
     }
 
     @Override
@@ -408,6 +393,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        to(topic, producedInternal);
+        return builder.stream(
+            Collections.singleton(topic),
+            new ConsumedInternal<>(
+                producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde,
+                producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde,
+                new FailOnInvalidTimestamp(),
+                null
+            )
+        );
+    }
+
+    @Override
     public void to(final String topic) {
         to(topic, Produced.with(null, null, null));
     }
@@ -431,7 +431,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topicExtractor, new ProducedInternal<>(produced));
     }
 
-    @SuppressWarnings("unchecked")
     private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
         final String name = builder.newProcessorName(SINK_NAME);
 
@@ -461,8 +460,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         transformNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
-
-        return new KStreamImpl<>(builder, name, sourceNodes, true, transformNode);
+        // cannot inherit key and value serde
+        return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
     }
 
     @Override
@@ -496,7 +495,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode);
+        // cannot inherit value serde
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder);
     }
 
     @Override
@@ -565,11 +565,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(joined.keySerde(), joined.valueSerde());
+            joinThis = joinThis.repartitionForJoin(joined);
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(joined.keySerde(), joined.otherValueSerde());
+            joinOther = joinOther.repartitionForJoin(Joined.with(joined.keySerde(), joined.otherValueSerde(), joined.valueSerde()));
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -587,17 +587,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
      * Repartition a stream. This is required on join operations occurring after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
      *
-     * @param keySerde Serdes for serializing the keys
-     * @param valSerde Serdes for serializing the values
+     * @param joined joined control object
      * @return a new {@link KStreamImpl}
      */
-    private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde) {
-
+    private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) {
+        final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde;
+        final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde;
         final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
         final String repartitionedSourceName = createRepartitionedSource(builder,
-                                                                         keySerde,
-                                                                         valSerde,
+                                                                         repartitionKeySerde,
+                                                                         repartitionValueSerde,
                                                                          null,
                                                                          name,
                                                                          optimizableRepartitionNodeBuilder);
@@ -605,7 +604,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode);
 
-        return new KStreamImpl<>(builder, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode);
+        return new KStreamImpl<>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode, builder);
     }
 
     static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder,
@@ -678,18 +677,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined.keySerde(), joined.valueSerde());
-            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false);
         } else {
-            return doStreamTableJoin(other, joiner, false);
+            return doStreamTableJoin(other, joiner, joined, false);
         }
     }
 
     @Override
-    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
-                                              final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
-                                              final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return globalTableJoin(globalTable, keyMapper, joiner, true);
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return leftJoin(other, joiner, Joined.with(null, null, null));
+    }
+
+    @Override
+    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
+                                            final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
+                                            final Joined<K, V, VT> joined) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(joined, "joined can't be null");
+        if (repartitionRequired) {
+            final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true);
+        } else {
+            return doStreamTableJoin(other, joiner, joined, true);
+        }
     }
 
     @Override
@@ -699,6 +711,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return globalTableJoin(globalTable, keyMapper, joiner, false);
     }
 
+    @Override
+    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
+                                              final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
+                                              final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return globalTableJoin(globalTable, keyMapper, joiner, true);
+    }
+
     private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable,
                                                         final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper,
                                                         final ValueJoiner<? super V, ? super V1, ? extends V2> joiner,
@@ -724,17 +743,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                         null);
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode);
+        // do not have serde for joined result
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder);
     }
 
     @SuppressWarnings("unchecked")
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
                                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
+                                                    final Joined<K, V, V1> joined,
                                                     final boolean leftJoin) {
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other);
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
@@ -743,7 +764,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             leftJoin
         );
 
-
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
         final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
             name,
@@ -754,27 +774,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
-        return new KStreamImpl<>(builder, name, allSourceNodes, false, streamTableJoinNode);
-    }
-
-    @Override
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return leftJoin(other, joiner, Joined.with(null, null, null));
-    }
-
-    @Override
-    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
-                                            final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
-                                            final Joined<K, V, VT> joined) {
-        Objects.requireNonNull(other, "other can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
-        Objects.requireNonNull(joined, "joined can't be null");
-        if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
-            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
-        } else {
-            return doStreamTableJoin(other, joiner, true);
-        }
+        // do not have serde for joined result
+        return new KStreamImpl<>(name, joined.keySerde() != null ? joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, builder);
     }
 
     @Override
@@ -792,16 +793,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         selectKeyMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
-        return new KGroupedStreamImpl<>(
-            builder,
-            selectKeyMapNode.nodeName(),
-            sourceNodes,
-            serializedInternal.keySerde(),
-            serializedInternal.valueSerde(),
-            true,
-            selectKeyMapNode
-        );
-
+        return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(),
+                                        serializedInternal.keySerde(),
+                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
+                                        sourceNodes,
+                                        true,
+                                        selectKeyMapNode,
+                                        builder);
     }
 
     @Override
@@ -812,14 +810,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized);
-        return new KGroupedStreamImpl<>(builder,
-                                        this.name,
+        return new KGroupedStreamImpl<>(this.name,
+                                        serializedInternal.keySerde() != null ? serializedInternal.keySerde() : keySerde,
+                                        serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde,
                                         sourceNodes,
-                                        serializedInternal.keySerde(),
-                                        serializedInternal.valueSerde(),
                                         this.repartitionRequired,
-                                        streamsGraphNode);
-
+                                        streamsGraphNode,
+                                        builder);
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@@ -851,7 +848,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             this.rightOuter = rightOuter;
         }
 
-        @SuppressWarnings("unchecked")
         public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
@@ -874,13 +870,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
 
-            final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
+            final ProcessorParameters<K1, V1> thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamName);
             final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
             builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
 
             final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
 
-            final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
+            final ProcessorParameters<K1, V2> otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamName);
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
             builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
 
@@ -902,11 +898,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
 
-            final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
+            final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
 
-            final ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
-            final ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
-            final ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
+            final ProcessorParameters<K1, V1> joinThisProcessorParams = new ProcessorParameters<>(joinThis, joinThisName);
+            final ProcessorParameters<K1, V2> joinOtherProcessorParams = new ProcessorParameters<>(joinOther, joinOtherName);
+            final ProcessorParameters<K1, R> joinMergeProcessorParams = new ProcessorParameters<>(joinMerge, joinMergeName);
 
             joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
                        .withJoinThisProcessorParameters(joinThisProcessorParams)
@@ -922,9 +918,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode);
 
-            final Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
+            final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K1, V1>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, joinGraphNode);
+
+            // do not have serde for joined result;
+            // also for key serde we do not inherit from either since we cannot tell if these two serdes are different
+            return new KStreamImpl<>(joinMergeName, joined.keySerde(), null, allSourceNodes, false, joinGraphNode, builder);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ea5c304..c5b2970 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -57,7 +57,7 @@ import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchU
  * @param <S> the source's (parent's) value type
  * @param <V> the value type
  */
-public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
+public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<K, V> {
 
     static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
@@ -87,38 +87,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     private final boolean isQueryable;
 
     private boolean sendOldValues = false;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
 
-    public KTableImpl(final InternalStreamsBuilder builder,
-                      final String name,
-                      final ProcessorSupplier<?, ?> processorSupplier,
-                      final Set<String> sourceNodes,
-                      final String queryableStoreName,
-                      final boolean isQueryable,
-                      final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
-        this.processorSupplier = processorSupplier;
-        this.queryableStoreName = queryableStoreName;
-        this.keySerde = null;
-        this.valSerde = null;
-        this.isQueryable = isQueryable;
-    }
-
-    public KTableImpl(final InternalStreamsBuilder builder,
-                      final String name,
-                      final ProcessorSupplier<?, ?> processorSupplier,
+    public KTableImpl(final String name,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde,
                       final Set<String> sourceNodes,
                       final String queryableStoreName,
                       final boolean isQueryable,
-                      final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                      final ProcessorSupplier<?, ?> processorSupplier,
+                      final StreamsGraphNode streamsGraphNode,
+                      final InternalStreamsBuilder builder) {
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.isQueryable = isQueryable;
     }
 
@@ -159,18 +140,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-
-        return new KTableImpl<>(
-            builder,
-            name,
-            processorSupplier,
-            this.keySerde,
-            this.valSerde,
-            sourceNodes,
-            shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
-            shouldMaterialize,
-            tableNode
-        );
+        // we can inherit parent key and value serde if user do not provide specific overrides, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) parent
+        // we preserve the value following the order of 1) materialized, 2) parent
+        return new KTableImpl<>(name,
+                                materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+                                materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : valSerde,
+                                sourceNodes,
+                                shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
+                                shouldMaterialize,
+                                processorSupplier,
+                                tableNode,
+                                builder);
     }
 
     @Override
@@ -234,14 +215,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
+        // don't inherit parent value serde, since this operation may change the value type, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+        // we preserve the value following the order of 1) materialized, 2) null
         return new KTableImpl<>(
-            builder,
             name,
-            processorSupplier,
+            materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+            materializedInternal != null ? materializedInternal.valueSerde() : null,
             sourceNodes,
             shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName,
             shouldMaterialize,
-            tableNode
+            processorSupplier,
+            tableNode,
+            builder
         );
     }
 
@@ -325,14 +311,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
+        // don't inherit parent value serde, since this operation may change the value type, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) parent, 3) null
+        // we preserve the value following the order of 1) materialized, 2) null
         return new KTableImpl<>(
-            builder,
             name,
-            processorSupplier,
+            materialized != null && materialized.keySerde() != null ? materialized.keySerde() : keySerde,
+            materialized != null ? materialized.valueSerde() : null,
             sourceNodes,
             shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
             shouldMaterialize,
-            tableNode);
+            processorSupplier,
+            tableNode,
+            builder);
     }
 
     @Override
@@ -352,7 +343,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         builder.addGraphNode(this.streamsGraphNode, toStreamNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, false, toStreamNode);
+        // we can inherit parent key and value serde
+        return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, false, toStreamNode, builder);
     }
 
     @Override
@@ -382,15 +374,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         builder.addGraphNode(streamsGraphNode, node);
 
         return new KTableImpl<K, S, V>(
-            builder,
             name,
-            suppressionSupplier,
             keySerde,
             valSerde,
             Collections.singleton(this.name),
             null,
             false,
-            node
+            suppressionSupplier,
+            node,
+            builder
         );
     }
 
@@ -475,7 +467,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
         return buildJoin(
-            (AbstractStream<K>) other,
+            (AbstractStream<K, VO>) other,
             joiner,
             leftOuter,
             rightOuter,
@@ -485,8 +477,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         );
     }
 
-    @SuppressWarnings("unchecked")
-    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
+    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                            final boolean leftOuter,
                                            final boolean rightOuter,
@@ -520,29 +511,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
         }
 
-        final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
-            new KTableImpl<K, V, R>(
-                builder,
-                joinThisName,
-                joinThis,
-                sourceNodes,
-                this.queryableStoreName,
-                false,
-                null
-            ),
-            new KTableImpl<K, V1, R>(
-                builder,
-                joinOtherName,
-                joinOther,
-                ((KTableImpl<K, ?, ?>) other).sourceNodes,
-                ((KTableImpl<K, ?, ?>) other).queryableStoreName,
-                false,
-                null
-            ),
-            internalQueryableName
-        );
+        final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName);
 
-        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
+        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, Change<V>, Change<V1>, Change<R>> kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
 
         // only materialize if specified in Materialized
         if (materializedInternal != null) {
@@ -550,9 +521,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         }
         kTableJoinNodeBuilder.withNodeName(joinMergeName);
 
-        final ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName);
-        final ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName);
-        final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
+        final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
+        final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
+        final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = new ProcessorParameters<>(joinMerge, joinMergeName);
 
         kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
                              .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
@@ -562,23 +533,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                              .withOtherJoinSideNodeName(((KTableImpl) other).name)
                              .withThisJoinSideNodeName(name);
 
-        final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
+        final KTableKTableJoinNode<K, Change<V>, Change<V1>, Change<R>> kTableKTableJoinNode = kTableJoinNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
 
-        return new KTableImpl<>(
-            builder,
+        // we can inherit parent key serde if user do not provide specific overrides
+        return new KTableImpl<K, Change<R>, R>(
             joinMergeName,
-            joinMerge,
+            materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
+            materializedInternal != null ? materializedInternal.valueSerde() : null,
             allSourceNodes,
             internalQueryableName,
             internalQueryableName != null,
-            kTableKTableJoinNode
+            joinMerge,
+            kTableKTableJoinNode,
+            builder
         );
     }
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) {
-        return this.groupBy(selector, Serialized.with(null, null));
+        return groupBy(selector, Serialized.with(null, null));
     }
 
     @Override
@@ -592,20 +566,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require parent to send old values
-        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(
-            selectName,
-            processorParameters,
-            false
-        );
+        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName, processorParameters, false);
 
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
         this.enableSendingOldValues();
+
         final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized);
+
+        // we cannot inherit parent key and value serdes since both of them may have changed;
+        // we can only inherit from what serialized specified here
         return new KGroupedTableImpl<>(
             builder,
             selectName,
-            this.name,
+            sourceNodes,
             serializedInternal.keySerde(),
             serializedInternal.valueSerde(),
             groupByMapNode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 5c464b9..2ed70bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -27,13 +27,13 @@ import java.util.Set;
 
 class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
 
-    private final KTableImpl<K, ?, V> parent1;
-    private final KTableImpl<K, ?, V> parent2;
+    private final KTableProcessorSupplier<K, ?, V> parent1;
+    private final KTableProcessorSupplier<K, ?, V> parent2;
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
-                           final KTableImpl<K, ?, V> parent2,
+    KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, V> parent1,
+                           final KTableProcessorSupplier<K, ?, V> parent2,
                            final String queryableName) {
         this.parent1 = parent1;
         this.parent2 = parent2;
@@ -55,13 +55,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
             return new KTableValueGetterSupplier<K, V>() {
 
                 public KTableValueGetter<K, V> get() {
-                    return parent1.valueGetterSupplier().get();
+                    return parent1.view().get();
                 }
 
                 @Override
                 public String[] storeNames() {
-                    final String[] storeNames1 = parent1.valueGetterSupplier().storeNames();
-                    final String[] storeNames2 = parent2.valueGetterSupplier().storeNames();
+                    final String[] storeNames1 = parent1.view().storeNames();
+                    final String[] storeNames2 = parent2.view().storeNames();
                     final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length);
                     Collections.addAll(stores, storeNames1);
                     Collections.addAll(stores, storeNames2);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 98076e0..e185f4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
@@ -40,10 +41,8 @@ import java.util.Set;
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
 
-public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements SessionWindowedKStream<K, V> {
     private final SessionWindows windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
 
@@ -55,11 +54,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
                                final Serde<V> valSerde,
                                final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
                                final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         Objects.requireNonNull(windows, "windows can't be null");
         this.windows = windows;
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.aggregateBuilder = aggregateBuilder;
     }
 
@@ -92,16 +89,17 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         }
 
         return aggregateBuilder.build(
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
-                windows, materializedInternal.storeName(),
+                windows,
+                materializedInternal.storeName(),
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
-                countMerger
-            ),
-            AGGREGATE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+                countMerger),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -125,6 +123,8 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         }
 
         return aggregateBuilder.build(
+            REDUCE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -132,10 +132,9 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
                 reduceAggregator,
                 mergerForAggregator(reduceAggregator)
             ),
-            REDUCE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -160,18 +159,19 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
+
         return aggregateBuilder.build(
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
                 initializer,
                 aggregator,
-                sessionMerger
-            ),
-            AGGREGATE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+                sessionMerger),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 5c5cfb2..2ee8f7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -40,11 +41,9 @@ import java.util.Set;
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
 
-public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> {
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
 
     private final Windows<W> windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     TimeWindowedKStreamImpl(final Windows<W> windows,
@@ -55,9 +54,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                             final Serde<V> valSerde,
                             final boolean repartitionRequired,
                             final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
-        this.valSerde = valSerde;
-        this.keySerde = keySerde;
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
         this.windows = Objects.requireNonNull(windows, "windows can't be null");
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
     }
@@ -92,19 +89,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         return aggregateBuilder.build(
-            new KStreamWindowAggregate<>(
-                windows,
-                materializedInternal.storeName(),
-                aggregateBuilder.countInitializer,
-                aggregateBuilder.countAggregator
-            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
-
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator) {
@@ -124,15 +116,12 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
             materializedInternal.withKeySerde(keySerde);
         }
         return aggregateBuilder.build(
-            new KStreamWindowAggregate<>(
-                windows,
-                materializedInternal.storeName(),
-                initializer,
-                aggregator
-            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable());
+            new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -156,11 +145,12 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         }
 
         return aggregateBuilder.build(
-            new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
             REDUCE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index 97fb69d..4d1b67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -102,11 +102,10 @@ public class GroupedTableOperationRepartitionNode<K, V> extends BaseRepartitionN
 
     }
 
-    public static GroupedTableOperationRepartitionNodeBuilder groupedTableOperationNodeBuilder() {
-        return new GroupedTableOperationRepartitionNodeBuilder();
+    public static <K1, V1> GroupedTableOperationRepartitionNodeBuilder<K1, V1> groupedTableOperationNodeBuilder() {
+        return new GroupedTableOperationRepartitionNodeBuilder<>();
     }
 
-
     public static final class GroupedTableOperationRepartitionNodeBuilder<K, V> {
 
         private Serde<K> keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index b63b66d..41c27ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -99,7 +99,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                "} " + super.toString();
     }
 
-    public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
+    public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
         return new KTableKTableJoinNodeBuilder<>();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index eb3d9f6..4b767b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
  */
 public class ProcessorParameters<K, V> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    private final ProcessorSupplier<? super K, ? super V> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier,
                                final String processorName) {
 
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<? super K, ? super V> processorSupplier() {
         return processorSupplier;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index 4360d08..ff180d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -17,16 +17,38 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class WindowedSerdesTest {
 
     private final String topic = "sample";
 
     @Test
+    public void shouldWrapForTimeWindowedSerde() {
+        final Serde<Windowed<String>> serde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        assertTrue(serde.serializer() instanceof TimeWindowedSerializer);
+        assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer);
+        assertTrue(((TimeWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer);
+        assertTrue(((TimeWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+    }
+
+    @Test
+    public void shouldWrapForSessionWindowedSerde() {
+        final Serde<Windowed<String>> serde = WindowedSerdes.sessionWindowedSerdeFrom(String.class);
+        assertTrue(serde.serializer() instanceof SessionWindowedSerializer);
+        assertTrue(serde.deserializer() instanceof SessionWindowedDeserializer);
+        assertTrue(((SessionWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer);
+        assertTrue(((SessionWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+    }
+
+    @Test
     public void testTimeWindowSerdeFrom() {
         final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0, Long.MAX_VALUE));
         final Serde<Windowed<Integer>> timeWindowedSerde = WindowedSerdes.timeWindowedSerdeFrom(Integer.class);
@@ -43,5 +65,4 @@ public class WindowedSerdesTest {
         final Windowed<Integer> windowed = sessionWindowedSerde.deserializer().deserialize(topic, bytes);
         Assert.assertEquals(sessionWindowed, windowed);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index d98fd79..b360cec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -96,7 +96,7 @@ public class AbstractStreamTest {
         assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length);
     }
 
-    private class ExtendedKStream<K, V> extends AbstractStream<K> {
+    private class ExtendedKStream<K, V> extends AbstractStream<K, V> {
 
         ExtendedKStream(final KStream<K, V> stream) {
             super((KStreamImpl<K, V>) stream);
@@ -104,11 +104,12 @@ public class AbstractStreamTest {
 
         KStream<K, V> randomFilter() {
             final String name = builder.newProcessorName("RANDOM-FILTER-");
-            final ProcessorGraphNode processorNode = new ProcessorGraphNode(name,
-                                                                            new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
-                                                                            false);
+            final ProcessorGraphNode<K, V> processorNode = new ProcessorGraphNode<>(
+                name,
+                new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
+                false);
             builder.addGraphNode(this.streamsGraphNode, processorNode);
-            return new KStreamImpl<>(builder, name, sourceNodes, false, processorNode);
+            return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, processorNode, builder);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index bce7fc8..119a7b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -32,12 +33,17 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -76,6 +82,8 @@ public class KStreamImplTest {
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    private Serde<String> mySerde = new Serdes.StringSerde();
+
     @Before
     public void before() {
         builder = new StreamsBuilder();
@@ -181,6 +189,121 @@ public class KStreamImplTest {
     }
 
     @Test
+    public void shouldPreserveSerdesForOperators() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> stream1 = builder.stream(Collections.singleton("topic-1"), stringConsumed);
+        final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
+        final GlobalKTable<String, String> table2 = builder.globalTable("topic-2", stringConsumed);
+        final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed);
+
+        final KeyValueMapper<String, String, String> selector = (key, value) -> key;
+        final KeyValueMapper<String, String, Iterable<KeyValue<String, String>>> flatSelector = (key, value) -> Collections.singleton(new KeyValue<>(key, value));
+        final ValueMapper<String, String> mapper = value -> value;
+        final ValueMapper<String, Iterable<String>> flatMapper = Collections::singleton;
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
+        final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public KeyValue<String, String> transform(final String key, final String value) {
+                return new KeyValue<>(key, value);
+            }
+
+            @Override
+            public void close() {}
+        };
+        final ValueTransformerSupplier<String, String> valueTransformerSupplier = () -> new ValueTransformer<String, String>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public String transform(final String value) {
+                return value;
+            }
+
+            @Override
+            public void close() {}
+        };
+
+        assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+
+        assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+
+        assertNull(((AbstractStream) stream1.selectKey(selector)).keySerde());
+        assertEquals(((AbstractStream) stream1.selectKey(selector)).valueSerde(), consumedInternal.valueSerde());
+
+        assertNull(((AbstractStream) stream1.map(KeyValue::new)).keySerde());
+        assertNull(((AbstractStream) stream1.map(KeyValue::new)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.mapValues(mapper)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) stream1.mapValues(mapper)).valueSerde());
+
+        assertNull(((AbstractStream) stream1.flatMap(flatSelector)).keySerde());
+        assertNull(((AbstractStream) stream1.flatMap(flatSelector)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) stream1.flatMapValues(flatMapper)).valueSerde());
+
+        assertNull(((AbstractStream) stream1.transform(transformerSupplier)).keySerde());
+        assertNull(((AbstractStream) stream1.transform(transformerSupplier)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).valueSerde());
+
+        assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
+        assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde(), null);
+    }
+
+    @Test
     public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 2692c17..10a27b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -43,20 +42,11 @@ public class KStreamMapTest {
     @Test
     public void testMap() {
         final StreamsBuilder builder = new StreamsBuilder();
-
-        final KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper =
-            new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(final Integer key, final String value) {
-                    return KeyValue.pair(value, key);
-                }
-            };
-
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
         final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.map(mapper).process(supplier);
+        stream.map((key, value) -> KeyValue.pair(value, key)).process(supplier);
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
             for (final int expectedKey : expectedKeys) {
@@ -75,16 +65,9 @@ public class KStreamMapTest {
 
     @Test
     public void testTypeVariance() {
-        final KeyValueMapper<Number, Object, KeyValue<Number, String>> stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() {
-            @Override
-            public KeyValue<Number, String> apply(final Number key, final Object value) {
-                return KeyValue.pair(key, key + ":" + value);
-            }
-        };
-
         new StreamsBuilder()
             .<Integer, String>stream("numbers")
-            .map(stringify)
+            .map((key, value) -> KeyValue.pair(key, key + ":" + value))
             .to("strings");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index eb586e6..6e666c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
@@ -28,13 +30,17 @@ import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -62,6 +68,7 @@ import static org.junit.Assert.assertNull;
 
 public class KTableImplTest {
 
+    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@@ -70,6 +77,8 @@ public class KTableImplTest {
     private StreamsBuilder builder;
     private KTable<String, String> table;
 
+    private Serde<String> mySerde = new Serdes.StringSerde();
+
     @Before
     public void setUp() {
         builder = new StreamsBuilder();
@@ -126,6 +135,74 @@ public class KTableImplTest {
     }
 
     @Test
+    public void shouldPreserveSerdesForOperators() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
+        final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed);
+
+        final KeyValueMapper<String, String, String> selector = (key, value) -> key;
+        final ValueMapper<String, String> mapper = value -> value;
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
+        final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, String, String>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public String transform(final String key, final String value) {
+                return value;
+            }
+
+            @Override
+            public void close() {}
+        };
+
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
+        assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde());
+        assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
+        assertEquals(((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde());
+
+        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
+        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+    }
+
+    @Test
     public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index d65f27e..75e9f51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -38,9 +38,7 @@ import static org.junit.Assert.assertEquals;
 
 public class StreamsGraphTest {
 
-    final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
-    
-
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
 
     // Test builds topology in succesive manner but only graph node not yet processed written to topology
 


Mime
View raw message