kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/4] kafka git commit: KAFKA-3336: Unify Serializer and Deserializer into Serialization
Date Thu, 17 Mar 2016 22:42:05 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 884933b..b293496 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -17,9 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -93,9 +92,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
-    private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
-    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
-
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
         super(topology, name, sourceNodes);
     }
@@ -199,18 +195,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public KStream<K, V> through(String topic,
-                                 Serializer<K> keySerializer,
-                                 Serializer<V> valSerializer,
-                                 Deserializer<K> keyDeserializer,
-                                 Deserializer<V> valDeserializer) {
-        to(topic, keySerializer, valSerializer);
+                                 Serde<K> keySerde,
+                                 Serde<V> valSerde) {
+        to(topic, keySerde, valSerde);
 
-        return topology.stream(keyDeserializer, valDeserializer, topic);
+        return topology.stream(keySerde, valSerde);
     }
 
     @Override
     public KStream<K, V> through(String topic) {
-        return through(topic, null, null, null, null);
+        return through(topic, null, null);
     }
 
     @Override
@@ -220,10 +214,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+    public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) {
         String name = topology.newName(SINK_NAME);
         StreamPartitioner<K, V> streamPartitioner = null;
 
+        Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
+
         if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
             streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
@@ -265,16 +262,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V> thisValueSerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V> thisValueDeserializer,
-            Deserializer<V1> otherValueDeserializer) {
-
-        return join(other, joiner, windows,
-                keySerializer, thisValueSerializer, otherValueSerializer,
-                keyDeserializer, thisValueDeserializer, otherValueDeserializer, false);
+            Serde<K> keySerde,
+            Serde<V> thisValueSerde,
+            Serde<V1> otherValueSerde) {
+
+        return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
     }
 
     @Override
@@ -282,16 +274,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V> thisValueSerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V> thisValueDeserializer,
-            Deserializer<V1> otherValueDeserializer) {
-
-        return join(other, joiner, windows,
-                keySerializer, thisValueSerializer, otherValueSerializer,
-                keyDeserializer, thisValueDeserializer, otherValueDeserializer, true);
+            Serde<K> keySerde,
+            Serde<V> thisValueSerde,
+            Serde<V1> otherValueSerde) {
+
+        return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
     }
 
     @SuppressWarnings("unchecked")
@@ -299,26 +286,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V> thisValueSerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V> thisValueDeserializer,
-            Deserializer<V1> otherValueDeserializer,
+            Serde<K> keySerde,
+            Serde<V> thisValueSerde,
+            Serde<V1> otherValueSerde,
             boolean outer) {
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(thisValueSerializer, thisValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(thisValueSerde)
                 .persistent()
                 .windowed(windows.maintainMs(), windows.segments, true)
                 .build();
 
         StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(otherValueSerializer, otherValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(otherValueSerde)
                 .persistent()
                 .windowed(windows.maintainMs(), windows.segments, true)
                 .build();
@@ -354,16 +338,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V1> otherValueDeserializer) {
+            Serde<K> keySerde,
+            Serde<V1> otherValueSerde) {
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(otherValueSerializer, otherValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(otherValueSerde)
                 .persistent()
                 .windowed(windows.maintainMs(), windows.segments, true)
                 .build();
@@ -397,18 +379,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
                                                                  Windows<W> windows,
-                                                                 Serializer<K> keySerializer,
-                                                                 Serializer<V> aggValueSerializer,
-                                                                 Deserializer<K> keyDeserializer,
-                                                                 Deserializer<V> aggValueDeserializer) {
+                                                                 Serde<K> keySerde,
+                                                                 Serde<V> aggValueSerde) {
 
         String reduceName = topology.newName(REDUCE_NAME);
 
         KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
 
         StateStoreSupplier reduceStore = Stores.create(windows.name())
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(aggValueSerializer, aggValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
                 .persistent()
                 .windowed(windows.maintainMs(), windows.segments, false)
                 .build();
@@ -423,10 +403,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public KTable<K, V> reduceByKey(Reducer<V> reducer,
-                                    Serializer<K> keySerializer,
-                                    Serializer<V> aggValueSerializer,
-                                    Deserializer<K> keyDeserializer,
-                                    Deserializer<V> aggValueDeserializer,
+                                    Serde<K> keySerde,
+                                    Serde<V> aggValueSerde,
                                     String name) {
 
         String reduceName = topology.newName(REDUCE_NAME);
@@ -434,8 +412,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
 
         StateStoreSupplier reduceStore = Stores.create(name)
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(aggValueSerializer, aggValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
                 .persistent()
                 .build();
 
@@ -451,18 +429,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                        Aggregator<K, V, T> aggregator,
                                                                        Windows<W> windows,
-                                                                       Serializer<K> keySerializer,
-                                                                       Serializer<T> aggValueSerializer,
-                                                                       Deserializer<K> keyDeserializer,
-                                                                       Deserializer<T> aggValueDeserializer) {
+                                                                       Serde<K> keySerde,
+                                                                       Serde<T> aggValueSerde) {
 
         String aggregateName = topology.newName(AGGREGATE_NAME);
 
         KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
 
         StateStoreSupplier aggregateStore = Stores.create(windows.name())
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(aggValueSerializer, aggValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
                 .persistent()
                 .windowed(windows.maintainMs(), windows.segments, false)
                 .build();
@@ -478,10 +454,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                            Aggregator<K, V, T> aggregator,
-                                           Serializer<K> keySerializer,
-                                           Serializer<T> aggValueSerializer,
-                                           Deserializer<K> keyDeserializer,
-                                           Deserializer<T> aggValueDeserializer,
+                                           Serde<K> keySerde,
+                                           Serde<T> aggValueSerde,
                                            String name) {
 
         String aggregateName = topology.newName(AGGREGATE_NAME);
@@ -489,8 +463,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(aggValueSerializer, aggValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
                 .persistent()
                 .build();
 
@@ -504,8 +478,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                                   Serializer<K> keySerializer,
-                                                                   Deserializer<K> keyDeserializer) {
+                                                                   Serde<K> keySerde) {
         return this.aggregateByKey(
                 new Initializer<Long>() {
                     @Override
@@ -518,13 +491,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     public Long apply(K aggKey, V value, Long aggregate) {
                         return aggregate + 1L;
                     }
-                }, windows, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER);
+                }, windows, keySerde, Serdes.Long());
     }
 
     @Override
-    public KTable<K, Long> countByKey(Serializer<K> keySerializer,
-                                      Deserializer<K> keyDeserializer,
-                                      String name) {
+    public     KTable<K, Long> countByKey(Serde<K> keySerde,
+                                          String name) {
         return this.aggregateByKey(
                 new Initializer<Long>() {
                     @Override
@@ -537,6 +509,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     public Long apply(K aggKey, V value, Long aggregate) {
                         return aggregate + 1L;
                     }
-                }, keySerializer, LONG_SERIALIZER, keyDeserializer, LONG_DESERIALIZER, name);
+                }, keySerde, Serdes.Long(), name);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index d63fcc8..496a476 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
@@ -77,15 +75,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
-    private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
-    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
-
     public final ProcessorSupplier<?, ?> processorSupplier;
 
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valDeserializer;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
 
     private boolean sendOldValues = false;
 
@@ -93,23 +86,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       String name,
                       ProcessorSupplier<?, ?> processorSupplier,
                       Set<String> sourceNodes) {
-        this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
+        this(topology, name, processorSupplier, sourceNodes, null, null);
     }
 
     public KTableImpl(KStreamBuilder topology,
                       String name,
                       ProcessorSupplier<?, ?> processorSupplier,
                       Set<String> sourceNodes,
-                      Serializer<K> keySerializer,
-                      Serializer<V> valSerializer,
-                      Deserializer<K> keyDeserializer,
-                      Deserializer<V> valDeserializer) {
+                      Serde<K> keySerde,
+                      Serde<V> valSerde) {
         super(topology, name, sourceNodes);
         this.processorSupplier = processorSupplier;
-        this.keySerializer = keySerializer;
-        this.valSerializer = valSerializer;
-        this.keyDeserializer = keyDeserializer;
-        this.valDeserializer = valDeserializer;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
     }
 
     @Override
@@ -143,18 +132,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KTable<K, V> through(String topic,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valSerializer,
-                                Deserializer<K> keyDeserializer,
-                                Deserializer<V> valDeserializer) {
-        to(topic, keySerializer, valSerializer);
+                                Serde<K> keySerde,
+                                Serde<V> valSerde) {
+        to(topic, keySerde, valSerde);
 
-        return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
+        return topology.table(keySerde, valSerde, topic);
     }
 
     @Override
     public KTable<K, V> through(String topic) {
-        return through(topic, null, null, null, null);
+        return through(topic, null, null);
     }
 
     @Override
@@ -163,8 +150,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
-        this.toStream().to(topic, keySerializer, valSerializer);
+    public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) {
+        this.toStream().to(topic, keySerde, valSerde);
     }
 
     @Override
@@ -255,12 +242,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                                Aggregator<K1, V1, T> add,
                                                Aggregator<K1, V1, T> remove,
                                                KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                               Serializer<K1> keySerializer,
-                                               Serializer<V1> valueSerializer,
-                                               Serializer<T> aggValueSerializer,
-                                               Deserializer<K1> keyDeserializer,
-                                               Deserializer<V1> valueDeserializer,
-                                               Deserializer<T> aggValueDeserializer,
+                                               Serde<K1> keySerde,
+                                               Serde<V1> valueSerde,
+                                               Serde<T> aggValueSerde,
                                                String name) {
 
         String selectName = topology.newName(SELECT_NAME);
@@ -270,16 +254,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         String topic = name + REPARTITION_TOPIC_SUFFIX;
 
-        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
+        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
         ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(aggValueSerializer, aggValueDeserializer)
+                .withKeys(keySerde)
+                .withValues(aggValueSerde)
                 .persistent()
                 .build();
 
@@ -289,10 +273,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
+        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
 
         // read the intermediate topic
-        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
@@ -304,10 +288,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
-                                       Serializer<K1> keySerializer,
-                                       Serializer<V> valueSerializer,
-                                       Deserializer<K1> keyDeserializer,
-                                       Deserializer<V> valueDeserializer,
+                                       Serde<K1> keySerde,
+                                       Serde<V> valueSerde,
                                        String name) {
         return this.aggregate(
                 new Initializer<Long>() {
@@ -332,17 +314,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                         return new KeyValue<>(selector.apply(key, value), value);
                     }
                 },
-                keySerializer, valueSerializer, LONG_SERIALIZER, keyDeserializer, valueDeserializer, LONG_DESERIALIZER, name);
+                keySerde, valueSerde, Serdes.Long(), name);
     }
 
     @Override
     public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
                                           Reducer<V1> removeReducer,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                          Serializer<K1> keySerializer,
-                                          Serializer<V1> valueSerializer,
-                                          Deserializer<K1> keyDeserializer,
-                                          Deserializer<V1> valueDeserializer,
+                                          Serde<K1> keySerde,
+                                          Serde<V1> valueSerde,
                                           String name) {
 
         String selectName = topology.newName(SELECT_NAME);
@@ -352,16 +332,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         String topic = name + REPARTITION_TOPIC_SUFFIX;
 
-        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+        ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerde.serializer());
+        ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueSerde.deserializer());
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
         ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerializer, keyDeserializer)
-                .withValues(valueSerializer, valueDeserializer)
+                .withKeys(keySerde)
+                .withValues(valueSerde)
                 .persistent()
                 .build();
 
@@ -371,10 +351,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         topology.addInternalTopic(topic);
-        topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
+        topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, selectName);
 
         // read the intermediate topic
-        topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+        topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(reduceName, aggregateSupplier, sourceName);
@@ -421,7 +401,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         synchronized (source) {
             if (!source.isMaterialized()) {
                 StateStoreSupplier storeSupplier =
-                        new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+                        new KTableStoreSupplier<>(source.topic, keySerde, valSerde, null);
                 // mark this state as non internal hence it is read directly from a user topic
                 topology.addStateStore(storeSupplier, false, name);
                 source.materialize();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index ffd5cf0..af3c0d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -17,14 +17,13 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.streams.state.internals.RocksDBStore;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A KTable storage. It stores all entries in a local RocksDB database.
@@ -35,15 +34,15 @@ import org.apache.kafka.streams.state.Serdes;
 public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final Time time;
 
     protected KTableStoreSupplier(String name,
-                                  Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-                                  Serializer<V> valSerializer, Deserializer<V> valDeserializer,
+                                  Serde<K> keySerde,
+                                  Serde<V> valSerde,
                                   Time time) {
         this.name = name;
-        this.serdes = new Serdes<>(name, keySerializer, keyDeserializer, valSerializer, valDeserializer);
+        this.serdes = new StateSerdes<>(name, keySerde, valSerde);
         this.time = time;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index e9d5252..fdcff19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.io.File;
@@ -43,32 +42,18 @@ public interface ProcessorContext {
     TaskId taskId();
 
     /**
-     * Returns the key serializer
+     * Returns the default key serde
      *
      * @return the key serializer
      */
-    Serializer<?> keySerializer();
+    Serde<?> keySerde();
 
     /**
-     * Returns the value serializer
+     * Returns the default value serde
      *
      * @return the value serializer
      */
-    Serializer<?> valueSerializer();
-
-    /**
-     * Returns the key deserializer
-     *
-     * @return the key deserializer
-     */
-    Deserializer<?> keyDeserializer();
-
-    /**
-     * Returns the value deserializer
-     *
-     * @return the value deserializer
-     */
-    Deserializer<?> valueDeserializer();
+    Serde<?> valueSerde();
 
     /**
      * Returns the state directory for the partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ab7122b..7f5d645 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -194,8 +194,8 @@ public class TopologyBuilder {
 
     /**
      * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the source used to reference this node when
@@ -214,10 +214,10 @@ public class TopologyBuilder {
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
@@ -242,8 +242,8 @@ public class TopologyBuilder {
 
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
@@ -262,8 +262,8 @@ public class TopologyBuilder {
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
      * the supplied partitioner.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
      * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
@@ -293,10 +293,10 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
@@ -316,10 +316,10 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the partition for each message processed by the sink
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f6e43d0..888b89e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,11 +17,10 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -37,10 +36,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     private final RecordCollector collector;
     private final ProcessorStateManager stateMgr;
 
-    private final Serializer<?> keySerializer;
-    private final Serializer<?> valSerializer;
-    private final Deserializer<?> keyDeserializer;
-    private final Deserializer<?> valDeserializer;
+    private final Serde<?> keySerde;
+    private final Serde<?> valSerde;
 
     private boolean initialized;
 
@@ -57,10 +54,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
         this.collector = collector;
         this.stateMgr = stateMgr;
 
-        this.keySerializer = config.keySerializer();
-        this.valSerializer = config.valueSerializer();
-        this.keyDeserializer = config.keyDeserializer();
-        this.valDeserializer = config.valueDeserializer();
+        this.keySerde = config.keySerde();
+        this.valSerde = config.valueSerde();
 
         this.initialized = false;
     }
@@ -89,23 +84,13 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public Serializer<?> keySerializer() {
-        return this.keySerializer;
-    }
-
-    @Override
-    public Serializer<?> valueSerializer() {
-        return this.valSerializer;
-    }
-
-    @Override
-    public Deserializer<?> keyDeserializer() {
-        return this.keyDeserializer;
+    public Serde<?> keySerde() {
+        return this.keySerde;
     }
 
     @Override
-    public Deserializer<?> valueDeserializer() {
-        return this.valDeserializer;
+    public Serde<?> valueSerde() {
+        return this.valSerde;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7ab59ee..ffc72fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -49,8 +49,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
-        if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerializer();
-        if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerializer();
+        if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer();
+        if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index fa4afaf..1868c1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -47,8 +47,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this.context = context;
 
         // if serializers are null, get the default ones from the context
-        if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
-        if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueDeserializer();
+        if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+        if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 0bcae18..3ad06e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -35,10 +34,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
-    private final Serializer<?> keySerializer;
-    private final Serializer<?> valSerializer;
-    private final Deserializer<?> keyDeserializer;
-    private final Deserializer<?> valDeserializer;
+    private final Serde<?> keySerde;
+    private final Serde<?> valSerde;
 
     private boolean initialized;
 
@@ -52,10 +49,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
         this.metrics = metrics;
         this.stateMgr = stateMgr;
 
-        this.keySerializer = config.keySerializer();
-        this.valSerializer = config.valueSerializer();
-        this.keyDeserializer = config.keyDeserializer();
-        this.valDeserializer = config.valueDeserializer();
+        this.keySerde = config.keySerde();
+        this.valSerde = config.valueSerde();
 
         this.initialized = false;
     }
@@ -84,23 +79,13 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public Serializer<?> keySerializer() {
-        return this.keySerializer;
+    public Serde<?> keySerde() {
+        return this.keySerde;
     }
 
     @Override
-    public Serializer<?> valueSerializer() {
-        return this.valSerializer;
-    }
-
-    @Override
-    public Deserializer<?> keyDeserializer() {
-        return this.keyDeserializer;
-    }
-
-    @Override
-    public Deserializer<?> valueDeserializer() {
-        return this.valDeserializer;
+    public Serde<?> valueSerde() {
+        return this.valSerde;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 491c812..e1a518d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -100,7 +100,6 @@ public class StreamThread extends Thread {
 
     private long lastClean;
     private long lastCommit;
-    private long recordsProcessed;
     private Throwable rebalanceException = null;
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
@@ -201,7 +200,6 @@ public class StreamThread extends Thread {
 
         this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
         this.lastCommit = time.milliseconds();
-        this.recordsProcessed = 0;
         this.time = time;
 
         this.sensors = new StreamsMetricsImpl(metrics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
deleted file mode 100644
index e925312..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-/**
- * Factory for creating serializers / deserializers for state stores in Kafka Streams.
- *
- * @param <K> key type of serdes
- * @param <V> value type of serdes
- */
-public final class Serdes<K, V> {
-
-    public static <K, V> Serdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
-        Serializer<K> keySerializer = serializer(keyClass);
-        Deserializer<K> keyDeserializer = deserializer(keyClass);
-        Serializer<V> valueSerializer = serializer(valueClass);
-        Deserializer<V> valueDeserializer = deserializer(valueClass);
-        return new Serdes<>(topic, keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
-    }
-
-    @SuppressWarnings("unchecked")
-    static <T> Serializer<T> serializer(Class<T> type) {
-        if (String.class.isAssignableFrom(type)) return (Serializer<T>) new StringSerializer();
-        if (Integer.class.isAssignableFrom(type)) return (Serializer<T>) new IntegerSerializer();
-        if (Long.class.isAssignableFrom(type)) return (Serializer<T>) new LongSerializer();
-        if (byte[].class.isAssignableFrom(type)) return (Serializer<T>) new ByteArraySerializer();
-        throw new IllegalArgumentException("Unknown class for built-in serializer");
-    }
-
-    @SuppressWarnings("unchecked")
-    static <T> Deserializer<T> deserializer(Class<T> type) {
-        if (String.class.isAssignableFrom(type)) return (Deserializer<T>) new StringDeserializer();
-        if (Integer.class.isAssignableFrom(type)) return (Deserializer<T>) new IntegerDeserializer();
-        if (Long.class.isAssignableFrom(type)) return (Deserializer<T>) new LongDeserializer();
-        if (byte[].class.isAssignableFrom(type)) return (Deserializer<T>) new ByteArrayDeserializer();
-        throw new IllegalArgumentException("Unknown class for built-in serializer");
-    }
-
-    private final String topic;
-    private Serializer<K> keySerializer;
-    private Serializer<V> valueSerializer;
-    private Deserializer<K> keyDeserializer;
-    private Deserializer<V> valueDeserializer;
-
-    /**
-     * Create a context for serialization using the specified serializers and deserializers which
-     * <em>must</em> match the key and value types used as parameters for this object.
-     *
-     * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null
-     * @param keyDeserializer the deserializer for keys; may be null
-     * @param valueSerializer the serializer for values; may be null
-     * @param valueDeserializer the deserializer for values; may be null
-     */
-    @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
-        this.topic = topic;
-
-        if (keySerializer == null)
-            throw new NullPointerException();
-        if (keyDeserializer == null)
-            throw new NullPointerException();
-        if (valueSerializer == null)
-            throw new NullPointerException();
-        if (valueDeserializer == null)
-            throw new NullPointerException();
-
-        this.keySerializer = keySerializer;
-        this.keyDeserializer = keyDeserializer;
-        this.valueSerializer = valueSerializer;
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    public Deserializer<K> keyDeserializer() {
-        return keyDeserializer;
-    }
-
-    public Serializer<K> keySerializer() {
-        return keySerializer;
-    }
-
-    public Deserializer<V> valueDeserializer() {
-        return valueDeserializer;
-    }
-
-    public Serializer<V> valueSerializer() {
-        return valueSerializer;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    public K keyFrom(byte[] rawKey) {
-        return keyDeserializer.deserialize(topic, rawKey);
-    }
-
-    public V valueFrom(byte[] rawValue) {
-        return valueDeserializer.deserialize(topic, rawValue);
-    }
-
-    public byte[] rawKey(K key) {
-        return keySerializer.serialize(topic, key);
-    }
-
-    public byte[] rawValue(V value) {
-        return valueSerializer.serialize(topic, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
new file mode 100644
index 0000000..1a41a16
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Factory for creating serializers / deserializers for state stores in Kafka Streams.
+ *
+ * @param <K> key type of serdes
+ * @param <V> value type of serdes
+ */
+public final class StateSerdes<K, V> {
+
+    public static <K, V> StateSerdes<K, V> withBuiltinTypes(String topic, Class<K> keyClass, Class<V> valueClass) {
+        return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
+    }
+
+    private final String topic;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+
+    /**
+     * Create a context for serialization using the specified serializers and deserializers which
+     * <em>must</em> match the key and value types used as parameters for this object; the state changelog topic
+     * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
+     * need to provide the topic name any more.
+     *
+     * @param topic the name of the topic
+     * @param keySerde the serde for keys; cannot be null
+     * @param valueSerde the serde for values; cannot be null
+     */
+    @SuppressWarnings("unchecked")
+    public StateSerdes(String topic,
+                       Serde<K> keySerde,
+                       Serde<V> valueSerde) {
+        this.topic = topic;
+
+        if (keySerde == null)
+            throw new IllegalArgumentException("key serde cannot be null");
+        if (valueSerde == null)
+            throw new IllegalArgumentException("value serde cannot be null");
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public Deserializer<K> keyDeserializer() {
+        return keySerde.deserializer();
+    }
+
+    public Serializer<K> keySerializer() {
+        return keySerde.serializer();
+    }
+
+    public Deserializer<V> valueDeserializer() {
+        return valueSerde.deserializer();
+    }
+
+    public Serializer<V> valueSerializer() {
+        return valueSerde.serializer();
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public K keyFrom(byte[] rawKey) {
+        return keySerde.deserializer().deserialize(topic, rawKey);
+    }
+
+    public V valueFrom(byte[] rawValue) {
+        return valueSerde.deserializer().deserialize(topic, rawValue);
+    }
+
+    public byte[] rawKey(K key) {
+        return keySerde.serializer().serialize(topic, key);
+    }
+
+    public byte[] rawValue(V value) {
+        return valueSerde.serializer().serialize(topic, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index e803832..33df13f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,22 +16,16 @@
  */
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 
+import java.nio.ByteBuffer;
+
 /**
  * Factory for creating state stores in Kafka Streams.
  */
@@ -46,13 +40,12 @@ public class Stores {
     public static StoreFactory create(final String name) {
         return new StoreFactory() {
             @Override
-            public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
+            public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) {
                 return new ValueFactory<K>() {
                     @Override
-                    public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
-                                                                final Deserializer<V> valueDeserializer) {
-                        final Serdes<K, V> serdes =
-                                new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
+                    public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) {
+                        final StateSerdes<K, V> serdes =
+                                new StateSerdes<>(name, keySerde, valueSerde);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
@@ -116,7 +109,7 @@ public class Stores {
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<String> withStringKeys() {
-            return withKeys(new StringSerializer(), new StringDeserializer());
+            return withKeys(Serdes.String());
         }
 
         /**
@@ -125,7 +118,7 @@ public class Stores {
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Integer> withIntegerKeys() {
-            return withKeys(new IntegerSerializer(), new IntegerDeserializer());
+            return withKeys(Serdes.Integer());
         }
 
         /**
@@ -134,7 +127,25 @@ public class Stores {
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Long> withLongKeys() {
-            return withKeys(new LongSerializer(), new LongDeserializer());
+            return withKeys(Serdes.Long());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Double}s.
+         *
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<Double> withDoubleKeys() {
+            return withKeys(Serdes.Double());
+        }
+
+        /**
+         * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link ByteBuffer}.
+         *
+         * @return the interface used to specify the type of values; never null
+         */
+        public ValueFactory<ByteBuffer> withByteBufferKeys() {
+            return withKeys(Serdes.ByteBuffer());
         }
 
         /**
@@ -143,30 +154,26 @@ public class Stores {
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<byte[]> withByteArrayKeys() {
-            return withKeys(new ByteArraySerializer(), new ByteArrayDeserializer());
+            return withKeys(Serdes.ByteArray());
         }
 
         /**
-         * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
-         * {@link Long}, or {@code byte[]}.
+         * Begin to create a {@link KeyValueStore} by specifying the keys.
          *
-         * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
-         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
-         *            {@code byte[].class})
+         * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
          * @return the interface used to specify the type of values; never null
          */
         public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
-            return withKeys(Serdes.serializer(keyClass), Serdes.deserializer(keyClass));
+            return withKeys(Serdes.serdeFrom(keyClass));
         }
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
          *
-         * @param keySerializer the serializer for keys; may not be null
-         * @param keyDeserializer the deserializer for keys; may not be null
+         * @param keySerde the serialization factory for keys; may not be null
          * @return the interface used to specify the type of values; never null
          */
-        public abstract <K> ValueFactory<K> withKeys(Serializer<K> keySerializer, Deserializer<K> keyDeserializer);
+        public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
     }
 
     /**
@@ -181,7 +188,7 @@ public class Stores {
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, String> withStringValues() {
-            return withValues(new StringSerializer(), new StringDeserializer());
+            return withValues(Serdes.String());
         }
 
         /**
@@ -190,7 +197,7 @@ public class Stores {
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Integer> withIntegerValues() {
-            return withValues(new IntegerSerializer(), new IntegerDeserializer());
+            return withValues(Serdes.Integer());
         }
 
         /**
@@ -199,7 +206,25 @@ public class Stores {
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Long> withLongValues() {
-            return withValues(new LongSerializer(), new LongDeserializer());
+            return withValues(Serdes.Long());
+        }
+
+        /**
+         * Use {@link Double} values.
+         *
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, Double> withDoubleValues() {
+            return withValues(Serdes.Double());
+        }
+
+        /**
+         * Use {@link ByteBuffer} for values.
+         *
+         * @return the interface used to specify the remaining key-value store options; never null
+         */
+        public KeyValueFactory<K, ByteBuffer> withByteBufferValues() {
+            return withValues(Serdes.ByteBuffer());
         }
 
         /**
@@ -208,30 +233,26 @@ public class Stores {
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, byte[]> withByteArrayValues() {
-            return withValues(new ByteArraySerializer(), new ByteArrayDeserializer());
+            return withValues(Serdes.ByteArray());
         }
 
         /**
-         * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
-         * .
+         * Use values of the specified type.
          *
-         * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
-         *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
-         *            {@code byte[].class})
+         * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serdes
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
-            return withValues(Serdes.serializer(valueClass), Serdes.deserializer(valueClass));
+            return withValues(Serdes.serdeFrom(valueClass));
         }
 
         /**
          * Use the specified serializer and deserializer for the values.
          *
-         * @param valueSerializer the serializer for value; may not be null
-         * @param valueDeserializer the deserializer for values; may not be null
+         * @param valueSerde the serialization factory for values; may not be null
          * @return the interface used to specify the remaining key-value store options; never null
          */
-        public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer);
+        public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde);
     }
 
     /**
@@ -240,7 +261,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
-    public static interface KeyValueFactory<K, V> {
+    public interface KeyValueFactory<K, V> {
         /**
          * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
          * read to restore the entries if they are lost.
@@ -264,7 +285,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
-    public static interface InMemoryKeyValueFactory<K, V> {
+    public interface InMemoryKeyValueFactory<K, V> {
         /**
          * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
          * equivalent to not placing a limit on the number of entries.
@@ -288,7 +309,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
-    public static interface PersistentKeyValueFactory<K, V> {
+    public interface PersistentKeyValueFactory<K, V> {
 
         /**
          * Set the persistent store as a windowed key-value store

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
index c6bbb23..66e1338 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -25,11 +25,11 @@ public class WindowStoreUtils {
 
     public static final int TIMESTAMP_SIZE = 8;
     public static final int SEQNUM_SIZE = 4;
-    public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
+    public static final StateSerdes<byte[], byte[]> INNER_SERDES = StateSerdes.withBuiltinTypes("", byte[].class, byte[].class);
     @SuppressWarnings("unchecked")
     public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
 
-    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
+    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
@@ -40,7 +40,7 @@ public class WindowStoreUtils {
         return buf.array();
     }
 
-    public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
+    public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) {
         byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
 
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index d25faa8..32116dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -22,20 +22,20 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
 public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
     private final KeyValueStore<K, V> inner;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
     private StoreChangeLogger.ValueGetter<K, V> getter;
 
-    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
+    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final StateSerdes<K, V> serdes) {
         this.storeName = storeName;
         this.inner = inner;
         this.serdes = serdes;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index b96a103..4054d68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Iterator;
 import java.util.List;
@@ -45,9 +45,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
     private final Time time;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
 
-    public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+    public InMemoryKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) {
         this.name = name;
         this.time = time;
         this.serdes = serdes;
@@ -67,7 +67,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         private final NavigableMap<K, V> map;
 
         private boolean loggingEnabled = false;
-        private Serdes<K, V> serdes = null;
+        private StateSerdes<K, V> serdes = null;
 
         public MemoryStore(String name) {
             super();
@@ -75,7 +75,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
             this.map = new TreeMap<>();
         }
 
-        public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+        public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) {
             this.loggingEnabled = true;
             this.serdes = serdes;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 9b7936a..1c2241f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
@@ -32,10 +32,10 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
     private final int capacity;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final Time time;
 
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, StateSerdes<K, V> serdes, Time time) {
         this.name = name;
         this.capacity = capacity;
         this.serdes = serdes;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index bd03f03..a5aaa06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -43,7 +43,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     protected EldestEntryRemovalListener<K, V> listener;
 
     private boolean loggingEnabled = false;
-    private Serdes<K, V> serdes = null;
+    private StateSerdes<K, V> serdes = null;
 
     // this is used for extended MemoryNavigableLRUCache only
     public MemoryLRUCache() {}
@@ -69,7 +69,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         };
     }
 
-    public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+    public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) {
         this.loggingEnabled = true;
         this.serdes = serdes;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 3a4c351..ec10c3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
@@ -33,10 +33,10 @@ import org.apache.kafka.streams.state.Serdes;
 public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final Time time;
 
-    public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+    public RocksDBKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) {
         this.name = name;
         this.serdes = serdes;
         this.time = time;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index c295aea..3045856 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
@@ -68,7 +68,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private final FlushOptions fOptions;
 
     private ProcessorContext context;
-    private Serdes<K, V> serdes;
+    private StateSerdes<K, V> serdes;
     protected File dbDir;
     private RocksDB db;
 
@@ -92,11 +92,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         return this;
     }
 
-    public RocksDBStore(String name, Serdes<K, V> serdes) {
+    public RocksDBStore(String name, StateSerdes<K, V> serdes) {
         this(name, DB_FILE_DIR, serdes);
     }
 
-    public RocksDBStore(String name, String parentDir, Serdes<K, V> serdes) {
+    public RocksDBStore(String name, String parentDir, StateSerdes<K, V> serdes) {
         this.name = name;
         this.parentDir = parentDir;
         this.serdes = serdes;
@@ -399,9 +399,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
         private final RocksIterator iter;
-        private final Serdes<K, V> serdes;
+        private final StateSerdes<K, V> serdes;
 
-        public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+        public RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
             this.iter = iter;
             this.serdes = serdes;
         }
@@ -463,7 +463,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         private final Comparator<byte[]> comparator = new LexicographicComparator();
         byte[] rawToKey;
 
-        public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+        public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes,
                                     K from, K to) {
             super(iter, serdes);
             iter.seek(serdes.rawKey(from));

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b1605a3..61c2e5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.WindowStoreUtils;
@@ -61,15 +61,15 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
-        private final Serdes<?, V> serdes;
+        private final StateSerdes<?, V> serdes;
         private final KeyValueIterator<byte[], byte[]>[] iterators;
         private int index = 0;
 
-        RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
+        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
             this(serdes, WindowStoreUtils.NO_ITERATORS);
         }
 
-        RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
             this.serdes = serdes;
             this.iterators = iterators;
         }
@@ -114,7 +114,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final long segmentInterval;
     private final boolean retainDuplicates;
     private final Segment[] segments;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final SimpleDateFormat formatter;
     private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
 
@@ -125,7 +125,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private boolean loggingEnabled = false;
     private StoreChangeLogger<byte[], byte[]> changeLogger = null;
 
-    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
+    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes) {
         this.name = name;
 
         // The segment interval must be greater than MIN_SEGMENT_INTERVAL


Mime
View raw message