kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/4] kafka git commit: KAFKA-3336: Unify Serializer and Deserializer into Serialization
Date Thu, 17 Mar 2016 22:42:06 GMT
KAFKA-3336: Unify Serializer and Deserializer into Serialization

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll, Ismael Juma

Closes #1066 from guozhangwang/K3336


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dea0719e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dea0719e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dea0719e

Branch: refs/heads/trunk
Commit: dea0719e99211684775780f5da8b93835d7a5dac
Parents: f57dabb
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Mar 17 15:41:59 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 17 15:41:59 2016 -0700

----------------------------------------------------------------------
 .../serialization/DoubleDeserializer.java       |  46 +++++
 .../common/serialization/DoubleSerializer.java  |  46 +++++
 .../kafka/common/serialization/Serde.java       |  26 +++
 .../kafka/common/serialization/Serdes.java      | 193 +++++++++++++++++++
 .../common/serialization/SerializationTest.java | 115 ++++++++---
 .../examples/pageview/PageViewTypedDemo.java    |  39 ++--
 .../examples/pageview/PageViewUntypedDemo.java  |  20 +-
 .../kafka/streams/examples/pipe/PipeDemo.java   |   9 +-
 .../examples/wordcount/WordCountDemo.java       |  20 +-
 .../wordcount/WordCountProcessorDemo.java       |   9 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  75 +++----
 .../apache/kafka/streams/kstream/KStream.java   | 133 +++++--------
 .../kafka/streams/kstream/KStreamBuilder.java   |  38 ++--
 .../apache/kafka/streams/kstream/KTable.java    |  51 ++---
 .../streams/kstream/internals/KStreamImpl.java  | 134 +++++--------
 .../streams/kstream/internals/KTableImpl.java   |  94 ++++-----
 .../kstream/internals/KTableStoreSupplier.java  |  13 +-
 .../streams/processor/ProcessorContext.java     |  25 +--
 .../streams/processor/TopologyBuilder.java      |  24 +--
 .../internals/ProcessorContextImpl.java         |  35 +---
 .../streams/processor/internals/SinkNode.java   |   4 +-
 .../streams/processor/internals/SourceNode.java |   4 +-
 .../processor/internals/StandbyContextImpl.java |  33 +---
 .../processor/internals/StreamThread.java       |   2 -
 .../org/apache/kafka/streams/state/Serdes.java  | 136 -------------
 .../apache/kafka/streams/state/StateSerdes.java | 108 +++++++++++
 .../org/apache/kafka/streams/state/Stores.java  | 109 ++++++-----
 .../kafka/streams/state/WindowStoreUtils.java   |   6 +-
 .../internals/InMemoryKeyValueLoggedStore.java  |   6 +-
 .../InMemoryKeyValueStoreSupplier.java          |  10 +-
 .../InMemoryLRUCacheStoreSupplier.java          |   6 +-
 .../streams/state/internals/MemoryLRUCache.java |   6 +-
 .../internals/RocksDBKeyValueStoreSupplier.java |   6 +-
 .../streams/state/internals/RocksDBStore.java   |  14 +-
 .../state/internals/RocksDBWindowStore.java     |  12 +-
 .../internals/RocksDBWindowStoreSupplier.java   |   6 +-
 .../state/internals/StoreChangeLogger.java      |  10 +-
 .../apache/kafka/streams/StreamsConfigTest.java |  10 -
 .../kstream/internals/KStreamBranchTest.java    |   8 +-
 .../kstream/internals/KStreamFilterTest.java    |  10 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   8 +-
 .../internals/KStreamFlatMapValuesTest.java     |   8 +-
 .../kstream/internals/KStreamImplTest.java      |  24 +--
 .../internals/KStreamKStreamJoinTest.java       |  33 ++--
 .../internals/KStreamKStreamLeftJoinTest.java   |  26 +--
 .../internals/KStreamKTableLeftJoinTest.java    |  20 +-
 .../kstream/internals/KStreamMapTest.java       |  11 +-
 .../kstream/internals/KStreamMapValuesTest.java |  10 +-
 .../kstream/internals/KStreamTransformTest.java |   9 +-
 .../internals/KStreamTransformValuesTest.java   |   8 +-
 .../internals/KStreamWindowAggregateTest.java   |  33 ++--
 .../kstream/internals/KTableAggregateTest.java  |  22 +--
 .../kstream/internals/KTableFilterTest.java     |  28 ++-
 .../kstream/internals/KTableImplTest.java       |  37 ++--
 .../kstream/internals/KTableKTableJoinTest.java |  24 +--
 .../internals/KTableKTableLeftJoinTest.java     |  30 ++-
 .../internals/KTableKTableOuterJoinTest.java    |  24 +--
 .../kstream/internals/KTableMapValuesTest.java  |  31 ++-
 .../kstream/internals/KTableSourceTest.java     |  26 +--
 .../WindowedStreamPartitionerTest.java          |  10 +-
 .../internals/ProcessorTopologyTest.java        |   7 +-
 .../processor/internals/StandbyTaskTest.java    |   7 +-
 .../internals/StreamPartitionAssignorTest.java  |   7 +-
 .../processor/internals/StreamTaskTest.java     |   7 +-
 .../processor/internals/StreamThreadTest.java   |   7 +-
 .../streams/smoketest/SmokeTestClient.java      |  69 +++----
 .../streams/smoketest/SmokeTestDriver.java      |  22 +--
 .../kafka/streams/smoketest/SmokeTestUtil.java  |  81 +-------
 .../streams/state/KeyValueStoreTestDriver.java  |  72 +------
 .../internals/InMemoryKeyValueStoreTest.java    |   8 +-
 .../internals/InMemoryLRUCacheStoreTest.java    |   8 +-
 .../internals/RocksDBKeyValueStoreTest.java     |   8 +-
 .../state/internals/RocksDBWindowStoreTest.java |  49 +++--
 .../state/internals/StoreChangeLoggerTest.java  |   6 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  13 +-
 .../apache/kafka/test/MockProcessorContext.java |  55 ++----
 76 files changed, 1184 insertions(+), 1315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
new file mode 100644
index 0000000..ed4f323
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.util.Map;
+
+public class DoubleDeserializer implements Deserializer<Double> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public Double deserialize(String topic, byte[] data) {
+        if (data == null)
+            return null;
+        if (data.length != 8) {
+            throw new SerializationException("Size of data received by Deserializer is not 8");
+        }
+
+        long value = 0;
+        for (byte b : data) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Double.longBitsToDouble(value);
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
new file mode 100644
index 0000000..9d01342
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class DoubleSerializer implements Serializer<Double> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] serialize(String topic, Double data) {
+        if (data == null)
+            return null;
+
+        long bits = Double.doubleToLongBits(data);
+        return new byte[] {
+            (byte) (bits >>> 56),
+            (byte) (bits >>> 48),
+            (byte) (bits >>> 40),
+            (byte) (bits >>> 32),
+            (byte) (bits >>> 24),
+            (byte) (bits >>> 16),
+            (byte) (bits >>> 8),
+            (byte) bits
+        };
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
new file mode 100644
index 0000000..cc7944e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+/**
+ * The interface for wrapping a serializer and deserializer for the given data type.
+ *
+ * @param <T>
+ */
+public interface Serde<T> {
+
+    Serializer<T> serializer();
+
+    Deserializer<T> deserializer();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
new file mode 100644
index 0000000..f27f74f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Factory for creating serializers / deserializers.
+ */
+public class Serdes {
+
+    static public final class LongSerde implements Serde<Long> {
+        @Override
+        public Serializer<Long> serializer() {
+            return new LongSerializer();
+        }
+
+        @Override
+        public Deserializer<Long> deserializer() {
+            return new LongDeserializer();
+        }
+    }
+
+    static public final class IntegerSerde implements Serde<Integer> {
+        @Override
+        public Serializer<Integer> serializer() {
+            return new IntegerSerializer();
+        }
+
+        @Override
+        public Deserializer<Integer> deserializer() {
+            return new IntegerDeserializer();
+        }
+    }
+
+    static public final class DoubleSerde implements Serde<Double> {
+        @Override
+        public Serializer<Double> serializer() {
+            return new DoubleSerializer();
+        }
+
+        @Override
+        public Deserializer<Double> deserializer() {
+            return new DoubleDeserializer();
+        }
+    }
+
+    static public final class StringSerde implements Serde<String> {
+        @Override
+        public Serializer<String> serializer() {
+            return new StringSerializer();
+        }
+
+        @Override
+        public Deserializer<String> deserializer() {
+            return new StringDeserializer();
+        }
+    }
+
+    static public final class ByteBufferSerde implements Serde<ByteBuffer> {
+        @Override
+        public Serializer<ByteBuffer> serializer() {
+            return new ByteBufferSerializer();
+        }
+
+        @Override
+        public Deserializer<ByteBuffer> deserializer() {
+            return new ByteBufferDeserializer();
+        }
+    }
+
+    static public final class ByteArraySerde implements Serde<byte[]> {
+        @Override
+        public Serializer<byte[]> serializer() {
+            return new ByteArraySerializer();
+        }
+
+        @Override
+        public Deserializer<byte[]> deserializer() {
+            return new ByteArrayDeserializer();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    static public <T> Serde<T> serdeFrom(Class<T> type) {
+        if (String.class.isAssignableFrom(type)) {
+            return (Serde<T>) String();
+        }
+
+        if (Integer.class.isAssignableFrom(type)) {
+            return (Serde<T>) Integer();
+        }
+
+        if (Long.class.isAssignableFrom(type)) {
+            return (Serde<T>) Long();
+        }
+
+        if (Double.class.isAssignableFrom(type)) {
+            return (Serde<T>) Double();
+        }
+
+        if (byte[].class.isAssignableFrom(type)) {
+            return (Serde<T>) ByteArray();
+        }
+
+        if (ByteBufferSerde.class.isAssignableFrom(type)) {
+            return (Serde<T>) ByteBuffer();
+        }
+
+        // TODO: we can also serializes objects of type T using generic Java serialization by default
+        throw new IllegalArgumentException("Unknown class for built-in serializer");
+    }
+
+    /**
+     * Construct a serde object from separate serializer and deserializer
+     *
+     * @param serializer    must not be null.
+     * @param deserializer  must not be null.
+     */
+    static public <T> Serde<T> serdeFrom(final Serializer<T> serializer, final Deserializer<T> deserializer) {
+        if (serializer == null) {
+            throw new IllegalArgumentException("serializer must not be null");
+        }
+        if (deserializer == null) {
+            throw new IllegalArgumentException("deserializer must not be null");
+        }
+
+        return new Serde<T>() {
+            @Override
+            public Serializer<T> serializer() {
+                return serializer;
+            }
+
+            @Override
+            public Deserializer<T> deserializer() {
+                return deserializer;
+            }
+        };
+    }
+
+    /*
+     * A serde for nullable long type.
+     */
+    static public Serde<Long> Long() {
+        return new LongSerde();
+    }
+
+    /*
+     * A serde for nullable int type.
+     */
+    static public Serde<Integer> Integer() {
+        return new IntegerSerde();
+    }
+
+    /*
+     * A serde for nullable long type.
+     */
+    static public Serde<Double> Double() {
+        return new DoubleSerde();
+    }
+
+    /*
+     * A serde for nullable string type.
+     */
+    static public Serde<String> String() {
+        return new StringSerde();
+    }
+
+    /*
+     * A serde for nullable byte array type.
+     */
+    static public Serde<ByteBuffer> ByteBuffer() {
+        return new ByteBufferSerde();
+    }
+
+    /*
+     * A serde for nullable byte array type.
+     */
+    static public Serde<byte[]> ByteArray() {
+        return new ByteArraySerde();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 87d9e0a..e4cd678 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -24,34 +24,53 @@ import static org.junit.Assert.assertEquals;
 
 public class SerializationTest {
 
-    private static class SerDeser<T> {
-        final Serializer<T> serializer;
-        final Deserializer<T> deserializer;
+    final private String topic = "testTopic";
 
-        public SerDeser(Serializer<T> serializer, Deserializer<T> deserializer) {
-            this.serializer = serializer;
-            this.deserializer = deserializer;
-        }
+    private class DummyClass {
+
+    }
+
+    @Test
+    public void testSerdeFrom() {
+        Serde<Long> thisSerde = Serdes.serdeFrom(Long.class);
+        Serde<Long> otherSerde = Serdes.Long();
+
+        Long value = 423412424L;
+
+        assertEquals("Should get the original long after serialization and deserialization",
+                value, thisSerde.deserializer().deserialize(topic, otherSerde.serializer().serialize(topic, value)));
+        assertEquals("Should get the original long after serialization and deserialization",
+                value, otherSerde.deserializer().deserialize(topic, thisSerde.serializer().serialize(topic, value)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSerdeFromUnknown() {
+        Serdes.serdeFrom(DummyClass.class);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSerdeFromNotNull() {
+        Serdes.serdeFrom(null, Serdes.Long().deserializer());
     }
 
     @Test
     public void testStringSerializer() {
         String str = "my string";
-        String mytopic = "testTopic";
+
         List<String> encodings = new ArrayList<String>();
         encodings.add("UTF8");
         encodings.add("UTF-16");
 
         for (String encoding : encodings) {
-            SerDeser<String> serDeser = getStringSerDeser(encoding);
-            Serializer<String> serializer = serDeser.serializer;
-            Deserializer<String> deserializer = serDeser.deserializer;
+            Serde<String> serDeser = getStringSerde(encoding);
+            Serializer<String> serializer = serDeser.serializer();
+            Deserializer<String> deserializer = serDeser.deserializer();
 
             assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding,
-                    str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str)));
+                    str, deserializer.deserialize(topic, serializer.serialize(topic, str)));
 
             assertEquals("Should support null in serialization and deserialization with encoding " + encoding,
-                    null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+                    null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
         }
     }
 
@@ -61,18 +80,61 @@ public class SerializationTest {
             423412424,
             -41243432
         };
-        String mytopic = "testTopic";
 
-        Serializer<Integer> serializer = new IntegerSerializer();
-        Deserializer<Integer> deserializer = new IntegerDeserializer();
+        Serializer<Integer> serializer = Serdes.Integer().serializer();
+        Deserializer<Integer> deserializer = Serdes.Integer().deserializer();
 
         for (Integer integer : integers) {
             assertEquals("Should get the original integer after serialization and deserialization",
-                    integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic, integer)));
+                    integer, deserializer.deserialize(topic, serializer.serialize(topic, integer)));
+        }
+
+        assertEquals("Should support null in serialization and deserialization",
+                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
+
+        serializer.close();
+        deserializer.close();
+    }
+
+    @Test
+    public void testLongSerializer() {
+        Long[] longs = new Long[]{
+            922337203685477580L,
+            -922337203685477581L
+        };
+
+        Serializer<Long> serializer = Serdes.Long().serializer();
+        Deserializer<Long> deserializer = Serdes.Long().deserializer();
+
+        for (Long value : longs) {
+            assertEquals("Should get the original long after serialization and deserialization",
+                    value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
+        }
+
+        assertEquals("Should support null in serialization and deserialization",
+                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
+
+        serializer.close();
+        deserializer.close();
+    }
+
+    @Test
+    public void testDoubleSerializer() {
+        Double[] doubles = new Double[]{
+            5678567.12312d,
+            -5678567.12341d
+        };
+
+        Serializer<Double> serializer = Serdes.Double().serializer();
+        Deserializer<Double> deserializer = Serdes.Double().deserializer();
+
+        for (Double value : doubles) {
+            assertEquals("Should get the original double after serialization and deserialization",
+                    value, deserializer.deserialize(topic, serializer.serialize(topic, value)));
         }
 
         assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
 
         serializer.close();
         deserializer.close();
@@ -80,34 +142,33 @@ public class SerializationTest {
 
     @Test
     public void testByteBufferSerializer() {
-        String mytopic = "testTopic";
         ByteBuffer buf = ByteBuffer.allocate(10);
         buf.put("my string".getBytes());
 
-        Serializer<ByteBuffer> serializer = new ByteBufferSerializer();
-        Deserializer<ByteBuffer> deserializer = new ByteBufferDeserializer();
+        Serializer<ByteBuffer> serializer = Serdes.ByteBuffer().serializer();
+        Deserializer<ByteBuffer> deserializer = Serdes.ByteBuffer().deserializer();
 
         assertEquals("Should get the original ByteBuffer after serialization and deserialization",
-              buf, deserializer.deserialize(mytopic, serializer.serialize(mytopic, buf)));
+              buf, deserializer.deserialize(topic, serializer.serialize(topic, buf)));
 
         assertEquals("Should support null in serialization and deserialization",
-                null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+                null, deserializer.deserialize(topic, serializer.serialize(topic, null)));
 
         serializer.close();
         deserializer.close();
     }
 
-    private SerDeser<String> getStringSerDeser(String encoder) {
+    private Serde<String> getStringSerde(String encoder) {
         Map<String, Object> serializerConfigs = new HashMap<String, Object>();
         serializerConfigs.put("key.serializer.encoding", encoder);
-        Serializer<String> serializer = new StringSerializer();
+        Serializer<String> serializer = Serdes.String().serializer();
         serializer.configure(serializerConfigs, true);
 
         Map<String, Object> deserializerConfigs = new HashMap<String, Object>();
         deserializerConfigs.put("key.deserializer.encoding", encoder);
-        Deserializer<String> deserializer = new StringDeserializer();
+        Deserializer<String> deserializer = Serdes.String().deserializer();
         deserializer.configure(deserializerConfigs, true);
 
-        return new SerDeser<String>(serializer, deserializer);
+        return Serdes.serdeFrom(serializer, deserializer);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 4f9de29..15083b2 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -18,9 +18,8 @@
 package org.apache.kafka.streams.examples.pageview;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.HoppingWindows;
@@ -83,10 +82,6 @@ public class PageViewTypedDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
@@ -94,35 +89,44 @@ public class PageViewTypedDemo {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-
         // TODO: the following can be removed with a serialization factory
         Map<String, Object> serdeProps = new HashMap<>();
 
+        final Serializer<PageView> pageViewSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", PageView.class);
+        pageViewSerializer.configure(serdeProps, false);
+
         final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>();
         serdeProps.put("JsonPOJOClass", PageView.class);
         pageViewDeserializer.configure(serdeProps, false);
 
-        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", UserProfile.class);
-        userProfileDeserializer.configure(serdeProps, false);
-
         final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", UserProfile.class);
         userProfileSerializer.configure(serdeProps, false);
 
+        final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", UserProfile.class);
+        userProfileDeserializer.configure(serdeProps, false);
+
         final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
         wPageViewByRegionSerializer.configure(serdeProps, false);
 
+        final Deserializer<WindowedPageViewByRegion> wPageViewByRegionDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
+        wPageViewByRegionDeserializer.configure(serdeProps, false);
+
         final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountSerializer.configure(serdeProps, false);
 
-        KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input");
+        final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", RegionCount.class);
+        regionCountDeserializer.configure(serdeProps, false);
+
+        KStream<String, PageView> views = builder.stream(Serdes.String(), Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer), "streams-pageview-input");
 
-        KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input");
+        KTable<String, UserProfile> users = builder.table(Serdes.String(), Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer), "streams-userprofile-input");
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
                 .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -146,8 +150,7 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
@@ -166,7 +169,7 @@ public class PageViewTypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer);
+        regionCount.to("streams-pageviewstats-typed-output", Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer), Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer));
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 9377095..5b80f64 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.connect.json.JsonSerializer;
 import org.apache.kafka.connect.json.JsonDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
@@ -59,10 +59,6 @@ public class PageViewUntypedDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
@@ -70,14 +66,13 @@ public class PageViewUntypedDemo {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
         final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
+        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
 
-        KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input");
+        KStream<String, JsonNode> views = builder.stream(Serdes.String(), jsonSerde, "streams-pageview-input");
 
-        KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input");
+        KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde, "streams-userprofile-input");
 
         KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
             @Override
@@ -103,8 +98,7 @@ public class PageViewUntypedDemo {
                         return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
                     }
                 })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
-                        stringSerializer, stringDeserializer)
+                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
@@ -122,7 +116,7 @@ public class PageViewUntypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer);
+        regionCount.to("streams-pageviewstats-untyped-output", jsonSerde, jsonSerde);
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index c37c68a..619f33d 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.examples.pipe;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
@@ -41,10 +40,8 @@ public class PipeDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 03d5142..ebd6050 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -17,11 +17,7 @@
 
 package org.apache.kafka.streams.examples.wordcount;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -52,21 +48,13 @@ public class WordCountDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Long> longSerializer = new LongSerializer();
-
-        KStream<String, String> source = builder.stream("streams-file-input");
+        KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "streams-file-input");
 
         KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -80,9 +68,9 @@ public class WordCountDemo {
                         return new KeyValue<String, String>(value, value);
                     }
                 })
-                .countByKey(stringSerializer, stringDeserializer, "Counts");
+                .countByKey(Serdes.String(), "Counts");
 
-        counts.to("streams-wordcount-output", stringSerializer, longSerializer);
+        counts.to("streams-wordcount-output", Serdes.String(), Serdes.Long());
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index b651b3a..8457415 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.examples.wordcount;
 
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KafkaStreams;
@@ -108,10 +107,8 @@ public class WordCountProcessorDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 52fdbd4..4e989be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -24,11 +24,12 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
 
 import java.util.Map;
 
@@ -91,17 +92,13 @@ public class StreamsConfig extends AbstractConfig {
     public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
     public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
 
-    /** <code>key.serializer</code> */
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.serializer</code> */
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>key.deserializer</code> */
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+    /** <code>replication.factor</code> */
+    public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
+    public static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface.";
 
-    /** <code>value.deserializer</code> */
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+    /** <code>replication.factor</code> */
+    public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
+    public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
 
     /** <code>metrics.sample.window.ms</code> */
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
@@ -121,8 +118,6 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>auto.offset.reset</code> */
     public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
 
-    private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor";
-
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
                                         Type.STRING,
@@ -152,32 +147,26 @@ public class StreamsConfig extends AbstractConfig {
                                         1,
                                         Importance.MEDIUM,
                                         REPLICATION_FACTOR_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG,        // required with no default value
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG,      // required with no default value
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG,      // required with no default value
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,    // required with no default value
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
                                 .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                                         Type.CLASS,
-                                        WALLCLOCK_TIMESTAMP_EXTRACTOR,
+                                        WallclockTimestampExtractor.class.getName(),
                                         Importance.MEDIUM,
                                         TIMESTAMP_EXTRACTOR_CLASS_DOC)
                                 .define(PARTITION_GROUPER_CLASS_CONFIG,
                                         Type.CLASS,
-                                        DefaultPartitionGrouper.class,
+                                        DefaultPartitionGrouper.class.getName(),
                                         Importance.MEDIUM,
                                         PARTITION_GROUPER_CLASS_DOC)
+                                .define(KEY_SERDE_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Serdes.ByteArraySerde.class.getName(),
+                                        Importance.MEDIUM,
+                                        KEY_SERDE_CLASS_DOC)
+                                .define(VALUE_SERDE_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Serdes.ByteArraySerde.class.getName(),
+                                        Importance.MEDIUM,
+                                        VALUE_SERDE_CLASS_DOC)
                                 .define(COMMIT_INTERVAL_MS_CONFIG,
                                         Type.LONG,
                                         30000,
@@ -273,8 +262,6 @@ public class StreamsConfig extends AbstractConfig {
 
         // remove properties that are not required for consumers
         removeStreamsSpecificConfigs(props);
-        props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
 
         return props;
     }
@@ -287,8 +274,6 @@ public class StreamsConfig extends AbstractConfig {
 
         // remove properties that are not required for producers
         removeStreamsSpecificConfigs(props);
-        props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
 
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
@@ -302,23 +287,17 @@ public class StreamsConfig extends AbstractConfig {
         props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
         props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG);
+        props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG);
         props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
     }
 
-    public Serializer keySerializer() {
-        return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Serializer valueSerializer() {
-        return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Deserializer keyDeserializer() {
-        return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    public Serde keySerde() {
+        return getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
     }
 
-    public Deserializer valueDeserializer() {
-        return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+    public Serde valueSerde() {
+        return getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
     }
 
     public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 1640bde..1c78652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
@@ -108,18 +107,14 @@ public interface KStream<K, V> {
      * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
-     * @param topic           the topic name
-     * @param keySerializer   key serializer used to send key-value pairs,
-     *                        if not specified the default key serializer defined in the configuration will be used
-     * @param valSerializer   value serializer used to send key-value pairs,
-     *                        if not specified the default value serializer defined in the configuration will be used
-     * @param keyDeserializer key deserializer used to create the new KStream,
-     *                        if not specified the default key deserializer defined in the configuration will be used
-     * @param valDeserializer value deserializer used to create the new KStream,
-     *                        if not specified the default value deserializer defined in the configuration will be used
+     * @param topic      the topic name
+     * @param keySerde   key serde used to send key-value pairs,
+     *                   if not specified the default key serde defined in the configuration will be used
+     * @param valSerde   value serde used to send key-value pairs,
+     *                   if not specified the default value serde defined in the configuration will be used
      * @return the instance of KStream that consumes the given topic
      */
-    KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+    KStream<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
      * Sends key-value to a topic using default serializers specified in the config.
@@ -131,13 +126,13 @@ public interface KStream<K, V> {
     /**
      * Sends key-value to a topic.
      *
-     * @param topic         the topic name
-     * @param keySerializer key serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param valSerializer value serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
+     * @param topic    the topic name
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param keySerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
      */
-    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+    void to(String topic, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
      * Applies a stateful transformation to all elements in this stream.
@@ -171,18 +166,12 @@ public interface KStream<K, V> {
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
      * @param windows the specification of the join window
-     * @param keySerializer key serializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param thisValueSerializer value serializer for this stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueSerializer value serializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param keyDeserializer key deserializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param thisValueDeserializer value deserializer for this stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueDeserializer value deserializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
+     * @param keySerde key serdes,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param thisValueSerde value serdes for this stream,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde value serdes for other stream,
+     *                      if not specified the default serdes defined in the configs will be used
      * @param <V1>   the value type of the other stream
      * @param <R>   the value type of the new stream
      */
@@ -190,12 +179,9 @@ public interface KStream<K, V> {
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V> thisValueSerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V> thisValueDeserializer,
-            Deserializer<V1> otherValueDeserializer);
+            Serde<K> keySerde,
+            Serde<V> thisValueSerde,
+            Serde<V1> otherValueSerde);
 
     /**
      * Combines values of this stream with another KStream using Windowed Outer Join.
@@ -203,18 +189,12 @@ public interface KStream<K, V> {
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
      * @param windows the specification of the join window
-     * @param keySerializer key serializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param thisValueSerializer value serializer for this stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueSerializer value serializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param keyDeserializer key deserializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param thisValueDeserializer value deserializer for this stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueDeserializer value deserializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
+     * @param keySerde key serdes,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param thisValueSerde value serdes for this stream,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde value serdes for other stream,
+     *                      if not specified the default serdes defined in the configs will be used
      * @param <V1>   the value type of the other stream
      * @param <R>   the value type of the new stream
      */
@@ -222,12 +202,9 @@ public interface KStream<K, V> {
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V> thisValueSerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V> thisValueDeserializer,
-            Deserializer<V1> otherValueDeserializer);
+            Serde<K> keySerde,
+            Serde<V> thisValueSerde,
+            Serde<V1> otherValueSerde);
 
     /**
      * Combines values of this stream with another KStream using Windowed Left Join.
@@ -235,14 +212,10 @@ public interface KStream<K, V> {
      * @param otherStream the instance of KStream joined with this stream
      * @param joiner ValueJoiner
      * @param windows the specification of the join window
-     * @param keySerializer key serializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueSerializer value serializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param keyDeserializer key deserializer,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param otherValueDeserializer value deserializer for other stream,
-     *                      if not specified the default serializer defined in the configs will be used
+     * @param keySerde key serdes,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param otherValueSerde value serdes for other stream,
+     *                      if not specified the default serdes defined in the configs will be used
      * @param <V1>   the value type of the other stream
      * @param <R>   the value type of the new stream
      */
@@ -250,10 +223,8 @@ public interface KStream<K, V> {
             KStream<K, V1> otherStream,
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
-            Serializer<K> keySerializer,
-            Serializer<V1> otherValueSerializer,
-            Deserializer<K> keyDeserializer,
-            Deserializer<V1> otherValueDeserializer);
+            Serde<K> keySerde,
+            Serde<V1> otherValueSerde);
 
     /**
      * Combines values of this stream with KTable using Left Join.
@@ -273,10 +244,8 @@ public interface KStream<K, V> {
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
                                                           Windows<W> windows,
-                                                          Serializer<K> keySerializer,
-                                                          Serializer<V> aggValueSerializer,
-                                                          Deserializer<K> keyDeserializer,
-                                                          Deserializer<V> aggValueDeserializer);
+                                                          Serde<K> keySerde,
+                                                          Serde<V> aggValueSerde);
 
     /**
      * Aggregate values of this stream by key on a window basis.
@@ -284,10 +253,8 @@ public interface KStream<K, V> {
      * @param reducer the class of Reducer
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer,
-                             Serializer<K> keySerializer,
-                             Serializer<V> aggValueSerializer,
-                             Deserializer<K> keyDeserializer,
-                             Deserializer<V> aggValueDeserializer,
+                             Serde<K> keySerde,
+                             Serde<V> aggValueSerde,
                              String name);
 
     /**
@@ -301,10 +268,8 @@ public interface KStream<K, V> {
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                 Aggregator<K, V, T> aggregator,
                                                                 Windows<W> windows,
-                                                                Serializer<K> keySerializer,
-                                                                Serializer<T> aggValueSerializer,
-                                                                Deserializer<K> keyDeserializer,
-                                                                Deserializer<T> aggValueDeserializer);
+                                                                Serde<K> keySerde,
+                                                                Serde<T> aggValueSerde);
 
     /**
      * Aggregate values of this stream by key without a window basis, and hence
@@ -316,10 +281,8 @@ public interface KStream<K, V> {
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
-                                    Serializer<K> keySerializer,
-                                    Serializer<T> aggValueSerializer,
-                                    Deserializer<K> keyDeserializer,
-                                    Deserializer<T> aggValueDeserializer,
+                                    Serde<K> keySerde,
+                                    Serde<T> aggValueSerde,
                                     String name);
 
     /**
@@ -328,14 +291,12 @@ public interface KStream<K, V> {
      * @param windows the specification of the aggregation window
      */
     <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                            Serializer<K> keySerializer,
-                                                            Deserializer<K> keyDeserializer);
+                                                            Serde<K> keySerde);
 
     /**
      * Count number of messages of this stream by key without a window basis, and hence
      * return a ever updating counting table.
      */
-    KTable<K, Long> countByKey(Serializer<K> keySerializer,
-                               Deserializer<K> keyDeserializer,
+    KTable<K, Long> countByKey(Serde<K> keySerde,
                                String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 3cf198c..dfd9281 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
@@ -40,7 +39,6 @@ public class KStreamBuilder extends TopologyBuilder {
         super();
     }
 
-    // TODO: needs updated
     /**
      * Creates a KStream instance for the specified topic.
      * The default deserializers specified in the config are used.
@@ -55,17 +53,17 @@ public class KStreamBuilder extends TopologyBuilder {
     /**
      * Creates a KStream instance for the specified topic.
      *
-     * @param keyDeserializer key deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
-     * @param valDeserializer value deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
-     * @param topics          the topic names, if empty default to all the topics in the config
+     * @param keySerde key serde used to read this source KStream,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to read this source KStream,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param topics   the topic names, if empty default to all the topics in the config
      * @return KStream
      */
-    public <K, V> KStream<K, V> stream(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) {
+    public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(name, keyDeserializer, valDeserializer, topics);
+        addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name));
     }
@@ -78,33 +76,29 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return KTable
      */
     public <K, V> KTable<K, V> table(String topic) {
-        return table(null, null, null, null, topic);
+        return table(null, null, topic);
     }
 
     /**
      * Creates a KTable instance for the specified topic.
      *
-     * @param keySerializer   key serializer used to send key-value pairs,
-     *                        if not specified the default key serializer defined in the configuration will be used
-     * @param valSerializer   value serializer used to send key-value pairs,
-     *                        if not specified the default value serializer defined in the configuration will be used
-     * @param keyDeserializer key deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
-     * @param valDeserializer value deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
+     * @param keySerde   key serde used to send key-value pairs,
+     *                        if not specified the default key serde defined in the configuration will be used
+     * @param valSerde   value serde used to send key-value pairs,
+     *                        if not specified the default value serde defined in the configuration will be used
      * @param topic          the topic name
      * @return KStream
      */
-    public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) {
+    public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         String source = newName(KStreamImpl.SOURCE_NAME);
         String name = newName(KTableImpl.SOURCE_NAME);
 
-        addSource(source, keyDeserializer, valDeserializer, topic);
+        addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
 
         ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(topic);
         addProcessor(name, processorSupplier, source);
 
-        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerializer, valSerializer, keyDeserializer, valDeserializer);
+        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b44ed21..0ae5150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 
 /**
@@ -68,17 +67,13 @@ public interface KTable<K, V> {
      * This is equivalent to calling to(topic) and table(topic).
      *
      * @param topic           the topic name
-     * @param keySerializer   key serializer used to send key-value pairs,
-     *                        if not specified the default key serializer defined in the configuration will be used
-     * @param valSerializer   value serializer used to send key-value pairs,
-     *                        if not specified the default value serializer defined in the configuration will be used
-     * @param keyDeserializer key deserializer used to create the new KStream,
-     *                        if not specified the default key deserializer defined in the configuration will be used
-     * @param valDeserializer value deserializer used to create the new KStream,
-     *                        if not specified the default value deserializer defined in the configuration will be used
+     * @param keySerde   key serde used to send key-value pairs,
+     *                        if not specified the default key serde defined in the configuration will be used
+     * @param valSerde   value serde used to send key-value pairs,
+     *                        if not specified the default value serde defined in the configuration will be used
      * @return the new stream that consumes the given topic
      */
-    KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer);
+    KTable<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
      * Sends key-value to a topic using default serializers specified in the config.
@@ -90,13 +85,13 @@ public interface KTable<K, V> {
     /**
      * Sends key-value to a topic.
      *
-     * @param topic         the topic name
-     * @param keySerializer key serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param valSerializer value serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
+     * @param topic    the topic name
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
      */
-    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
+    void to(String topic, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
      * Creates a new instance of KStream from this KTable
@@ -152,10 +147,8 @@ public interface KTable<K, V> {
     <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
                                    Reducer<V1> removeReducer,
                                    KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                   Serializer<K1> keySerializer,
-                                   Serializer<V1> valueSerializer,
-                                   Deserializer<K1> keyDeserializer,
-                                   Deserializer<V1> valueDeserializer,
+                                   Serde<K1> keySerde,
+                                   Serde<V1> valueSerde,
                                    String name);
 
     /**
@@ -174,12 +167,9 @@ public interface KTable<K, V> {
                                         Aggregator<K1, V1, T> add,
                                         Aggregator<K1, V1, T> remove,
                                         KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
-                                        Serializer<K1> keySerializer,
-                                        Serializer<V1> valueSerializer,
-                                        Serializer<T> aggValueSerializer,
-                                        Deserializer<K1> keyDeserializer,
-                                        Deserializer<V1> valueDeserializer,
-                                        Deserializer<T> aggValueDeserializer,
+                                        Serde<K1> keySerde,
+                                        Serde<V1> valueSerde,
+                                        Serde<T> aggValueSerde,
                                         String name);
 
     /**
@@ -191,10 +181,7 @@ public interface KTable<K, V> {
      * @return the instance of KTable
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
-                                Serializer<K1> keySerializer,
-                                Serializer<V> valueSerializer,
-                                Deserializer<K1> keyDeserializer,
-                                Deserializer<V> valueDeserializer,
+                                Serde<K1> keySerde,
+                                Serde<V> valueSerde,
                                 String name);
-
 }


Mime
View raw message