kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/4] kafka git commit: KAFKA-3336: Unify Serializer and Deserializer into Serialization
Date Thu, 17 Mar 2016 22:42:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f57dabbe5 -> dea0719e9


http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index a5990bd..be851bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -89,14 +90,10 @@ public class StreamPartitionAssignorTest {
     private Properties configProps() {
         return new Properties() {
             {
-                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f2ade6b..33fa5c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
 import org.junit.Before;
 
@@ -73,15 +74,11 @@ public class StreamTaskTest {
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b201c07..e387a59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
 
 import java.io.File;
@@ -113,14 +114,10 @@ public class StreamThreadTest {
     private Properties configProps() {
         return new Properties() {
             {
-                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 063eafe..ce4956c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -18,8 +18,6 @@
 package org.apache.kafka.streams.smoketest;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -85,11 +83,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class);
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
@@ -98,9 +92,9 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, Integer> source = builder.stream(stringDeserializer, integerDeserializer, "data");
+        KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
 
-        source.to("echo", stringSerializer, integerSerializer);
+        source.to("echo", stringSerde, intSerde);
 
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override
@@ -125,15 +119,13 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-min"),
-                stringSerializer,
-                integerSerializer,
-                stringDeserializer,
-                integerDeserializer
+                stringSerde,
+                intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
-        ).to("min", stringSerializer, integerSerializer);
+        ).to("min", stringSerde, intSerde);
 
-        KTable<String, Integer> minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min");
+        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min");
         minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
 
         // max
@@ -150,15 +142,13 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-max"),
-                stringSerializer,
-                integerSerializer,
-                stringDeserializer,
-                integerDeserializer
+                stringSerde,
+                intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
-        ).to("max", stringSerializer, integerSerializer);
+        ).to("max", stringSerde, intSerde);
 
-        KTable<String, Integer> maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max");
+        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max");
         maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
 
         // sum
@@ -175,28 +165,25 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("win-sum"),
-                stringSerializer,
-                longSerializer,
-                stringDeserializer,
-                longDeserializer
+                stringSerde,
+                longSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
-        ).to("sum", stringSerializer, longSerializer);
+        ).to("sum", stringSerde, longSerde);
 
 
-        KTable<String, Long> sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum");
+        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum");
         sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
 
         // cnt
         data.countByKey(
                 UnlimitedWindows.of("uwin-cnt"),
-                stringSerializer,
-                stringDeserializer
+                stringSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
-        ).to("cnt", stringSerializer, longSerializer);
+        ).to("cnt", stringSerde, longSerde);
 
-        KTable<String, Long> cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt");
+        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt");
         cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
 
         // dif
@@ -206,7 +193,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return value1 - value2;
                     }
                 }
-        ).to("dif", stringSerializer, integerSerializer);
+        ).to("dif", stringSerde, intSerde);
 
         // avg
         sumTable.join(
@@ -216,13 +203,12 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (double) value1 / (double) value2;
                     }
                 }
-        ).to("avg", stringSerializer, doubleSerializer);
+        ).to("avg", stringSerde, doubleSerde);
 
         // windowed count
         data.countByKey(
                 TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
-                stringSerializer,
-                stringDeserializer
+                stringSerde
         ).toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override
@@ -230,7 +216,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return new KeyValue<>(key.value() + "@" + key.window().start(), value);
                     }
                 }
-        ).to("wcnt", stringSerializer, longSerializer);
+        ).to("wcnt", stringSerde, longSerde);
 
         // test repartition
         Agg agg = new Agg();
@@ -239,14 +225,11 @@ public class SmokeTestClient extends SmokeTestUtil {
                 agg.adder(),
                 agg.remover(),
                 agg.selector(),
-                stringSerializer,
-                longSerializer,
-                longSerializer,
-                stringDeserializer,
-                longDeserializer,
-                longDeserializer,
+                stringSerde,
+                longSerde,
+                longSerde,
                 "cntByCnt"
-        ).to("tagg", stringSerializer, longSerializer);
+        ).to("tagg", stringSerde, longSerde);
 
         return new KafkaStreams(builder, props);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index c0a6f46..1abf88d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -157,7 +157,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
 
             ProducerRecord<byte[], byte[]> record =
-                    new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value));
+                    new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
 
             producer.send(record);
 
@@ -233,10 +233,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 retryCount = 0;
 
                 for (ConsumerRecord<byte[], byte[]> record : records) {
-                    String key = stringDeserializer.deserialize("", record.key());
+                    String key = stringSerde.deserializer().deserialize("", record.key());
                     switch (record.topic()) {
                         case "echo":
-                            Integer value = integerDeserializer.deserialize("", record.value());
+                            Integer value = intSerde.deserializer().deserialize("", record.value());
                             if (value != null && value == END) {
                                 keys.remove(key);
                                 if (keys.isEmpty()) {
@@ -249,28 +249,28 @@ public class SmokeTestDriver extends SmokeTestUtil {
                             }
                             break;
                         case "min":
-                            min.put(key, integerDeserializer.deserialize("", record.value()));
+                            min.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "max":
-                            max.put(key, integerDeserializer.deserialize("", record.value()));
+                            max.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "dif":
-                            dif.put(key, integerDeserializer.deserialize("", record.value()));
+                            dif.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "sum":
-                            sum.put(key, longDeserializer.deserialize("", record.value()));
+                            sum.put(key, longSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "cnt":
-                            cnt.put(key, longDeserializer.deserialize("", record.value()));
+                            cnt.put(key, longSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "avg":
-                            avg.put(key, doubleDeserializer.deserialize("", record.value()));
+                            avg.put(key, doubleSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "wcnt":
-                            wcnt.put(key, longDeserializer.deserialize("", record.value()));
+                            wcnt.put(key, longSerde.deserializer().deserialize("", record.value()));
                             break;
                         case "tagg":
-                            tagg.put(key, longDeserializer.deserialize("", record.value()));
+                            tagg.put(key, longSerde.deserializer().deserialize("", record.value()));
                             break;
                         default:
                             System.out.println("unknown topic: " + record.topic());

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 3f5503f..c5ded5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -17,15 +17,8 @@
 
 package org.apache.kafka.streams.smoketest;
 
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -36,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.io.File;
-import java.util.Map;
 
 public class SmokeTestUtil {
 
@@ -128,74 +120,13 @@ public class SmokeTestUtil {
         }
     }
 
-    public static Serializer<String> stringSerializer = new StringSerializer();
+    public static Serde<String> stringSerde = Serdes.String();
 
-    public static Deserializer<String> stringDeserializer = new StringDeserializer();
+    public static Serde<Integer> intSerde = Serdes.Integer();
 
-    public static Serializer<Integer> integerSerializer = new IntegerSerializer();
+    public static Serde<Long> longSerde = Serdes.Long();
 
-    public static Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
-
-    public static Serializer<Long> longSerializer = new LongSerializer();
-
-    public static Deserializer<Long> longDeserializer = new LongDeserializer();
-
-    public static Serializer<Double> doubleSerializer = new Serializer<Double>() {
-
-        @Override
-        public void configure(Map<String, ?> configs, boolean isKey) {
-        }
-
-        @Override
-        public byte[] serialize(String topic, Double data) {
-            if (data == null)
-                return null;
-
-            long bits = Double.doubleToLongBits(data);
-            return new byte[] {
-                (byte) (bits >>> 56),
-                (byte) (bits >>> 48),
-                (byte) (bits >>> 40),
-                (byte) (bits >>> 32),
-                (byte) (bits >>> 24),
-                (byte) (bits >>> 16),
-                (byte) (bits >>> 8),
-                (byte) bits
-            };
-        }
-
-        @Override
-        public void close() {
-        }
-    };
-
-    public static Deserializer<Double> doubleDeserializer = new Deserializer<Double>() {
-
-        @Override
-        public void configure(Map<String, ?> configs, boolean isKey) {
-        }
-
-        @Override
-        public Double deserialize(String topic, byte[] data) {
-            if (data == null)
-                return null;
-            if (data.length != 8) {
-                throw new SerializationException("Size of data received by Deserializer is " +
-                        "not 8");
-            }
-
-            long value = 0;
-            for (byte b : data) {
-                value <<= 8;
-                value |= b & 0xFF;
-            }
-            return Double.longBitsToDouble(value);
-        }
-
-        @Override
-        public void close() {
-        }
-    };
+    public static Serde<Double> doubleSerde = Serdes.Double();
 
     public static File createDir(String path) throws Exception {
         File dir = new File(path);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index d8b034f..0468f49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -129,58 +130,6 @@ import java.util.Set;
  */
 public class KeyValueStoreTestDriver<K, V> {
 
-    private static <T> Serializer<T> unusableSerializer() {
-        return new Serializer<T>() {
-            @Override
-            public void configure(Map<String, ?> configs, boolean isKey) {
-            }
-
-            @Override
-            public byte[] serialize(String topic, T data) {
-                throw new UnsupportedOperationException("This serializer should not be used");
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-    };
-
-    private static <T> Deserializer<T> unusableDeserializer() {
-        return new Deserializer<T>() {
-            @Override
-            public void configure(Map<String, ?> configs, boolean isKey) {
-            }
-
-            @Override
-            public T deserialize(String topic, byte[] data) {
-                throw new UnsupportedOperationException("This deserializer should not be used");
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-    };
-
-    /**
-     * Create a driver object that will have a {@link #context()} that records messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and
-     * value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the
-     * store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and
-     * value serializers and deserializers.
-     *
-     * @return the test driver; never null
-     */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create() {
-        Serializer<K> keySerializer = unusableSerializer();
-        Deserializer<K> keyDeserializer = unusableDeserializer();
-        Serializer<V> valueSerializer = unusableSerializer();
-        Deserializer<V> valueDeserializer = unusableDeserializer();
-        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
-        return new KeyValueStoreTestDriver<K, V>(serdes);
-    }
-
     /**
      * Create a driver object that will have a {@link #context()} that records messages
      * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
@@ -195,7 +144,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @return the test driver; never null
      */
     public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
-        Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass);
+        StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass);
         return new KeyValueStoreTestDriver<K, V>(serdes);
     }
 
@@ -215,7 +164,9 @@ public class KeyValueStoreTestDriver<K, V> {
                                                               Deserializer<K> keyDeserializer,
                                                               Serializer<V> valueSerializer,
                                                               Deserializer<V> valueDeserializer) {
-        Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
+        StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected",
+                Serdes.serdeFrom(keySerializer, keyDeserializer),
+                Serdes.serdeFrom(valueSerializer, valueDeserializer));
         return new KeyValueStoreTestDriver<K, V>(serdes);
     }
 
@@ -237,7 +188,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final RecordCollector recordCollector;
     private File stateDir = null;
 
-    protected KeyValueStoreTestDriver(final Serdes<K, V> serdes) {
+    protected KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         ByteArraySerializer rawSerializer = new ByteArraySerializer();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
@@ -276,13 +227,10 @@ public class KeyValueStoreTestDriver<K, V> {
         Properties props = new Properties();
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
 
-        this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
-                serdes.valueDeserializer(), recordCollector) {
+        this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) {
             @Override
             public TaskId taskId() {
                 return new TaskId(0, 1);
@@ -328,7 +276,7 @@ public class KeyValueStoreTestDriver<K, V> {
         }
     }
 
-    private void restoreEntries(StateRestoreCallback func, Serdes<K, V> serdes) {
+    private void restoreEntries(StateRestoreCallback func, StateSerdes<K, V> serdes) {
         for (KeyValue<K, V> entry : restorableEntries) {
             if (entry != null) {
                 byte[] rawKey = serdes.rawKey(entry.key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 46948bd..b44583d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -34,11 +32,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
         StateStoreSupplier supplier;
         if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().build();
         } else {
             supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index a2b79e5..c301223 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -40,11 +38,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
         StateStoreSupplier supplier;
         if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().maxEntries(10).build();
+            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).inMemory().maxEntries(10).build();
         } else {
             supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 8e8f69c..280255a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,11 +33,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
         StateStoreSupplier supplier;
         if (useContextSerdes) {
-            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
-            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
-            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
-            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).persistent().build();
+            supplier = Stores.create("my-store").withKeys(context.keySerde()).withValues(context.valueSerde()).persistent().build();
         } else {
             supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 5a196ec..ffc97c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -20,15 +20,15 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.WindowStoreUtils;
@@ -51,17 +51,16 @@ import static org.junit.Assert.assertNull;
 
 public class RocksDBWindowStoreTest {
 
-    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-    private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+    private final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
     private final String windowName = "window";
     private final int numSegments = 3;
     private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
     private final long windowSize = 3;
-    private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
+    private final StateSerdes<Integer, String> serdes = StateSerdes.withBuiltinTypes("", Integer.class, String.class);
 
     @SuppressWarnings("unchecked")
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, StateSerdes<K, V> serdes) {
         StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null);
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
@@ -74,7 +73,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -87,7 +86,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -170,7 +169,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -183,7 +182,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -266,7 +265,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -279,7 +278,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -362,7 +361,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -375,7 +374,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -421,7 +420,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -434,7 +433,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -538,7 +537,7 @@ public class RocksDBWindowStoreTest {
 
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -551,7 +550,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -587,7 +586,7 @@ public class RocksDBWindowStoreTest {
 
         File baseDir2 = Files.createTempDirectory("test").toFile();
         try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -600,7 +599,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -642,7 +641,7 @@ public class RocksDBWindowStoreTest {
     public void testSegmentMaintenance() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -652,7 +651,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             WindowStore<Integer, String> store = createWindowStore(context, serdes);
@@ -745,7 +744,7 @@ public class RocksDBWindowStoreTest {
     public void testInitialLoading() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
             RecordCollector recordCollector = new RecordCollector(producer) {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
@@ -755,7 +754,7 @@ public class RocksDBWindowStoreTest {
 
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    byteArraySerde, byteArraySerde,
                     recordCollector);
 
             File storeDir = new File(baseDir, windowName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 5f014ef..9a477df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
@@ -44,7 +44,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
     private final Map<Integer, String> written = new HashMap<>();
 
-    private final ProcessorContext context = new MockProcessorContext(Serdes.withBuiltinTypes(topic, Integer.class, String.class),
+    private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollector(null) {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -61,7 +61,7 @@ public class StoreChangeLoggerTest {
             }
     );
 
-    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, Serdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
+    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
 
     private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index edbcb4a..05713c1 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -18,7 +18,8 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -42,20 +43,20 @@ public class KStreamTestDriver {
     private ProcessorNode currNode;
 
     public KStreamTestDriver(KStreamBuilder builder) {
-        this(builder, null, null, null, null, null);
+        this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
     public KStreamTestDriver(KStreamBuilder builder, File stateDir) {
-        this(builder, stateDir, null, null, null, null);
+        this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
     public KStreamTestDriver(KStreamBuilder builder,
                              File stateDir,
-                             Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-                             Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
+                             Serde<?> keySerde,
+                             Serde<?> valSerde) {
         this.topology = builder.build("X", null);
         this.stateDir = stateDir;
-        this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
+        this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
 
         for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index b463669..e57e1c7 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -18,16 +18,15 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.io.File;
 import java.util.Collections;
@@ -38,10 +37,8 @@ import java.util.Map;
 public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
 
     private final KStreamTestDriver driver;
-    private final Serializer keySerializer;
-    private final Serializer valueSerializer;
-    private final Deserializer keyDeserializer;
-    private final Deserializer valueDeserializer;
+    private final Serde<?> keySerde;
+    private final Serde<?> valSerde;
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final File stateDir;
 
@@ -50,21 +47,15 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     long timestamp = -1L;
 
-    public MockProcessorContext(Serdes<?, ?> serdes, RecordCollector collector) {
-        this(null, null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), collector);
-    }
-
-    public MockProcessorContext(Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-                                RecordCollector collector) {
-        this(null, null, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, collector);
+    public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) {
+        this(null, null, serdes.keySerde(), serdes.valueSerde(), collector);
     }
 
     public MockProcessorContext(KStreamTestDriver driver, File stateDir,
-                                Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+                                Serde<?> keySerde,
+                                Serde<?> valSerde,
                                 final RecordCollector collector) {
-        this(driver, stateDir, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
+        this(driver, stateDir, keySerde, valSerde,
                 new RecordCollector.Supplier() {
                     @Override
                     public RecordCollector recordCollector() {
@@ -74,15 +65,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     public MockProcessorContext(KStreamTestDriver driver, File stateDir,
-                                Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
-                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+                                Serde<?> keySerde,
+                                Serde<?> valSerde,
                                 RecordCollector.Supplier collectorSupplier) {
         this.driver = driver;
         this.stateDir = stateDir;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.keyDeserializer = keyDeserializer;
-        this.valueDeserializer = valueDeserializer;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
         this.recordCollectorSupplier = collectorSupplier;
     }
 
@@ -111,23 +100,13 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public Serializer<?> keySerializer() {
-        return keySerializer;
-    }
-
-    @Override
-    public Serializer<?> valueSerializer() {
-        return valueSerializer;
-    }
-
-    @Override
-    public Deserializer<?> keyDeserializer() {
-        return keyDeserializer;
+    public Serde<?> keySerde() {
+        return this.keySerde;
     }
 
     @Override
-    public Deserializer<?> valueDeserializer() {
-        return valueDeserializer;
+    public Serde<?> valueSerde() {
+        return this.valSerde;
     }
 
     @Override


Mime
View raw message