kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3422: Add overloading functions without serdes in Streams DSL
Date Fri, 18 Mar 2016 19:39:47 GMT
KAFKA-3422: Add overloading functions without serdes in Streams DSL

Also include:

1) remove streams specific configs before passing to producer and consumer to avoid warning message;
2) add `ConsumerRecord` timestamp extractor and set as the default extractor.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll, Ewen Cheslack-Postava

Closes #1093 from guozhangwang/KConfigWarn


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d0cd766
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d0cd766
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d0cd766

Branch: refs/heads/trunk
Commit: 5d0cd7667f7e584f05ab4e76ed139fbafa81e042
Parents: 0d8cbbc
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Mar 18 12:39:41 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Mar 18 12:39:41 2016 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |  15 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   2 +-
 .../examples/wordcount/WordCountDemo.java       |   9 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  49 ++++--
 .../apache/kafka/streams/kstream/KStream.java   | 165 +++++++++++++++----
 .../apache/kafka/streams/kstream/KTable.java    |  76 +++++++--
 .../streams/kstream/internals/KStreamImpl.java  |  83 ++++++++--
 .../streams/kstream/internals/KTableImpl.java   |  52 ++++--
 .../kstream/internals/KTableStoreSupplier.java  |  11 +-
 .../ConsumerRecordTimestampExtractor.java       |  39 +++++
 .../processor/WallclockTimestampExtractor.java  |  35 ++++
 .../internals/WallclockTimestampExtractor.java  |  28 ----
 .../apache/kafka/streams/state/StateSerdes.java |  22 +--
 .../org/apache/kafka/streams/state/Stores.java  |  18 +-
 .../kafka/streams/state/WindowStoreUtils.java   |  15 +-
 .../internals/InMemoryKeyValueLoggedStore.java  |  26 ++-
 .../InMemoryKeyValueStoreSupplier.java          |  47 +++---
 .../InMemoryLRUCacheStoreSupplier.java          |  16 +-
 .../streams/state/internals/MemoryLRUCache.java |  25 +--
 .../internals/RocksDBKeyValueStoreSupplier.java |  16 +-
 .../streams/state/internals/RocksDBStore.java   |  27 ++-
 .../state/internals/RocksDBWindowStore.java     |  24 ++-
 .../internals/RocksDBWindowStoreSupplier.java   |  16 +-
 .../kstream/internals/KTableImplTest.java       |   4 +-
 .../kstream/internals/KTableMapValuesTest.java  |   2 +-
 .../streams/smoketest/SmokeTestClient.java      |  18 +-
 .../state/internals/RocksDBWindowStoreTest.java |  28 ++--
 27 files changed, 610 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 15083b2..0385bde 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.examples.pageview;
 
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
@@ -100,6 +101,8 @@ public class PageViewTypedDemo {
         serdeProps.put("JsonPOJOClass", PageView.class);
         pageViewDeserializer.configure(serdeProps, false);
 
+        final Serde<PageView> pageViewSerde = Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer);
+
         final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", UserProfile.class);
         userProfileSerializer.configure(serdeProps, false);
@@ -108,6 +111,8 @@ public class PageViewTypedDemo {
         serdeProps.put("JsonPOJOClass", UserProfile.class);
         userProfileDeserializer.configure(serdeProps, false);
 
+        final Serde<UserProfile> userProfileSerde = Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer);
+
         final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
         wPageViewByRegionSerializer.configure(serdeProps, false);
@@ -116,6 +121,8 @@ public class PageViewTypedDemo {
         serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class);
         wPageViewByRegionDeserializer.configure(serdeProps, false);
 
+        final Serde<WindowedPageViewByRegion> wPageViewByRegionSerde = Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer);
+
         final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>();
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountSerializer.configure(serdeProps, false);
@@ -124,9 +131,11 @@ public class PageViewTypedDemo {
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountDeserializer.configure(serdeProps, false);
 
-        KStream<String, PageView> views = builder.stream(Serdes.String(), Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer), "streams-pageview-input");
+        final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
+
+        KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
 
-        KTable<String, UserProfile> users = builder.table(Serdes.String(), Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer), "streams-userprofile-input");
+        KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input");
 
         KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
                 .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -169,7 +178,7 @@ public class PageViewTypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-typed-output", Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer), Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer));
+        regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 5b80f64..6f5cdf2 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -116,7 +116,7 @@ public class PageViewUntypedDemo {
                 });
 
         // write to the result topic
-        regionCount.to("streams-pageviewstats-untyped-output", jsonSerde, jsonSerde);
+        regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index ebd6050..e892abb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -48,13 +48,15 @@ public class WordCountDemo {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "streams-file-input");
+        KStream<String, String> source = builder.stream("streams-file-input");
 
         KTable<String, Long> counts = source
                 .flatMapValues(new ValueMapper<String, Iterable<String>>() {
@@ -68,9 +70,10 @@ public class WordCountDemo {
                         return new KeyValue<String, String>(value, value);
                     }
                 })
-                .countByKey(Serdes.String(), "Counts");
+                .countByKey("Counts");
 
-        counts.to("streams-wordcount-output", Serdes.String(), Serdes.Long());
+        // need to override value serde to Long type
+        counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
 
         KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4e989be..d4efbee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
 
 import java.util.Map;
 
@@ -149,7 +149,7 @@ public class StreamsConfig extends AbstractConfig {
                                         REPLICATION_FACTOR_DOC)
                                 .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                                         Type.CLASS,
-                                        WallclockTimestampExtractor.class.getName(),
+                                        ConsumerRecordTimestampExtractor.class.getName(),
                                         Importance.MEDIUM,
                                         TIMESTAMP_EXTRACTOR_CLASS_DOC)
                                 .define(PARTITION_GROUPER_CLASS_CONFIG,
@@ -233,12 +233,18 @@ public class StreamsConfig extends AbstractConfig {
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
         Map<String, Object> props = getBaseConsumerConfigs();
 
+        // add client id with stream client id prefix, and group id
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
-        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
 
+        // add configs required for stream partition assignor
         props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+
+        if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
+            props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
 
         return props;
     }
@@ -249,6 +255,7 @@ public class StreamsConfig extends AbstractConfig {
         // no need to set group id for a restore consumer
         props.remove(ConsumerConfig.GROUP_ID_CONFIG);
 
+        // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
 
         return props;
@@ -257,39 +264,49 @@ public class StreamsConfig extends AbstractConfig {
     private Map<String, Object> getBaseConsumerConfigs() {
         Map<String, Object> props = this.originals();
 
+        // remove streams properties
+        removeStreamsSpecificConfigs(props);
+
         // set consumer default property values
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-        // remove properties that are not required for consumers
-        removeStreamsSpecificConfigs(props);
-
         return props;
     }
 
     public Map<String, Object> getProducerConfigs(String clientId) {
         Map<String, Object> props = this.originals();
 
-        // set producer default property values
-        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+        // remove consumer properties that are not required for producers
+        props.remove(StreamsConfig.AUTO_OFFSET_RESET_CONFIG);
 
-        // remove properties that are not required for producers
+        // remove streams properties
         removeStreamsSpecificConfigs(props);
-        props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
 
+        // set producer default property values
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+        // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
 
         return props;
     }
 
     private void removeStreamsSpecificConfigs(Map<String, Object> props) {
-        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
+        props.remove(StreamsConfig.POLL_MS_CONFIG);
         props.remove(StreamsConfig.STATE_DIR_CONFIG);
-        props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
-        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
         props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG);
         props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG);
-        props.remove(InternalConfig.STREAM_THREAD_INSTANCE);
+        props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+        props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
+        props.remove(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+        props.remove(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+        props.remove(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG);
+        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        props.remove(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
     }
 
     public Serde keySerde() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 1c78652..6f05c3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -98,8 +98,9 @@ public interface KStream<K, V> {
      * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
-     * @param topic           the topic name
-     * @return the instance of KStream that consumes the given topic
+     * @param topic     the topic name
+     *
+     * @return          the instance of {@link KStream} that consumes the given topic
      */
     KStream<K, V> through(String topic);
 
@@ -107,32 +108,33 @@ public interface KStream<K, V> {
      * Sends key-value to a topic, also creates a new instance of KStream from the topic.
      * This is equivalent to calling to(topic) and from(topic).
      *
-     * @param topic      the topic name
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the configuration will be used
-     * @param valSerde   value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in the configuration will be used
-     * @return the instance of KStream that consumes the given topic
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name
+     *
+     * @return          the instance of {@link KStream} that consumes the given topic
      */
-    KStream<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde);
+    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
      * Sends key-value to a topic using default serializers specified in the config.
      *
-     * @param topic         the topic name
+     * @param topic     the topic name
      */
     void to(String topic);
 
     /**
      * Sends key-value to a topic.
      *
-     * @param topic    the topic name
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param keySerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default serde defined in the configs will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default serde defined in the configs will be used
+     * @param topic     the topic name
      */
-    void to(String topic, Serde<K> keySerde, Serde<V> valSerde);
+    void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
      * Applies a stateful transformation to all elements in this stream.
@@ -184,6 +186,20 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
+     * Combines values of this stream with another KStream using Windowed Inner Join.
+     *
+     * @param otherStream   the instance of {@link KStream} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param windows       the specification of the {@link JoinWindows}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
+     */
+    <V1, R> KStream<K, R> join(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows);
+
+    /**
      * Combines values of this stream with another KStream using Windowed Outer Join.
      *
      * @param otherStream the instance of KStream joined with this stream
@@ -207,6 +223,20 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
+     * Combines values of this stream with another KStream using Windowed Outer Join.
+     *
+     * @param otherStream   the instance of {@link KStream} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param windows       the specification of the {@link JoinWindows}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
+     */
+    <V1, R> KStream<K, R> outerJoin(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows);
+
+    /**
      * Combines values of this stream with another KStream using Windowed Left Join.
      *
      * @param otherStream the instance of KStream joined with this stream
@@ -227,20 +257,34 @@ public interface KStream<K, V> {
             Serde<V1> otherValueSerde);
 
     /**
+     * Combines values of this stream with another KStream using Windowed Left Join.
+     *
+     * @param otherStream   the instance of {@link KStream} joined with this stream
+     * @param joiner        the instance of {@link ValueJoiner}
+     * @param windows       the specification of the {@link JoinWindows}
+     * @param <V1>          the value type of the other stream
+     * @param <R>           the value type of the new stream
+     */
+    <V1, R> KStream<K, R> leftJoin(
+            KStream<K, V1> otherStream,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows);
+
+    /**
      * Combines values of this stream with KTable using Left Join.
      *
-     * @param ktable the instance of KTable joined with this stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the table
-     * @param <V2>   the value type of the new stream
+     * @param table     the instance of {@link KTable} joined with this stream
+     * @param joiner    the instance of {@link ValueJoiner}
+     * @param <V1>      the value type of the table
+     * @param <V2>      the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
+    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
 
     /**
      * Aggregate values of this stream by key on a window basis.
      *
-     * @param reducer the class of Reducer
-     * @param windows the specification of the aggregation window
+     * @param reducer the class of {@link Reducer}
+     * @param windows the specification of the aggregation {@link Windows}
      */
     <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
                                                           Windows<W> windows,
@@ -250,6 +294,14 @@ public interface KStream<K, V> {
     /**
      * Aggregate values of this stream by key on a window basis.
      *
+     * @param reducer the class of {@link Reducer}
+     * @param windows the specification of the aggregation {@link Windows}
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
+
+    /**
+     * Aggregate values of this stream by key on a window basis.
+     *
      * @param reducer the class of Reducer
      */
     KTable<K, V> reduceByKey(Reducer<V> reducer,
@@ -260,9 +312,16 @@ public interface KStream<K, V> {
     /**
      * Aggregate values of this stream by key on a window basis.
      *
+     * @param reducer the class of {@link Reducer}
+     */
+    KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
+
+    /**
+     * Aggregate values of this stream by key on a window basis.
+     *
      * @param initializer the class of Initializer
      * @param aggregator the class of Aggregator
-     * @param windows the specification of the aggregation window
+     * @param windows the specification of the aggregation {@link Windows}
      * @param <T>   the value type of the aggregated table
      */
     <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
@@ -272,12 +331,25 @@ public interface KStream<K, V> {
                                                                 Serde<T> aggValueSerde);
 
     /**
+     * Aggregate values of this stream by key on a window basis.
+     *
+     * @param initializer   the class of {@link Initializer}
+     * @param aggregator    the class of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param <T>           the value type of the aggregated table
+     */
+    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+                                                                Aggregator<K, V, T> aggregator,
+                                                                Windows<W> windows);
+
+    /**
      * Aggregate values of this stream by key without a window basis, and hence
      * return an ever updating table
      *
-     * @param initializer the class of Initializer
-     * @param aggregator the class of Aggregator
-     * @param <T>   the value type of the aggregated table
+     * @param initializer   the class of {@link Initializer}
+     * @param aggregator    the class of {@link Aggregator}
+     * @param name          the name of the aggregated table
+     * @param <T>           the value type of the aggregated table
      */
     <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                     Aggregator<K, V, T> aggregator,
@@ -286,17 +358,46 @@ public interface KStream<K, V> {
                                     String name);
 
     /**
+     * Aggregate values of this stream by key without a window basis, and hence
+     * return an ever updating table
+     *
+     * @param initializer   the class of {@link Initializer}
+     * @param aggregator    the class of {@link Aggregator}
+     * @param name          the name of the aggregated table
+     * @param <T>           the value type of the aggregated table
+     */
+    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+                                    Aggregator<K, V, T> aggregator,
+                                    String name);
+
+    /**
+     * Count number of messages of this stream by key on a window basis.
+     *
+     * @param windows       the specification of the aggregation {@link Windows}
+     */
+    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
+
+    /**
      * Count number of messages of this stream by key on a window basis.
      *
-     * @param windows the specification of the aggregation window
+     * @param windows       the specification of the aggregation {@link Windows}
      */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                            Serde<K> keySerde);
+    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
 
     /**
      * Count number of messages of this stream by key without a window basis, and hence
      * return a ever updating counting table.
+     *
+     * @param name          the name of the aggregated table
+     */
+    KTable<K, Long> countByKey(Serde<K> keySerde, String name);
+
+    /**
+     * Count number of messages of this stream by key without a window basis, and hence
+     * return a ever updating counting table.
+     *
+     * @param name          the name of the aggregated table
      */
-    KTable<K, Long> countByKey(Serde<K> keySerde,
-                               String name);
+    KTable<K, Long> countByKey(String name);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 0ae5150..997cb4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -66,14 +66,14 @@ public interface KTable<K, V> {
      * Sends key-value to a topic, also creates a new instance of KTable from the topic.
      * This is equivalent to calling to(topic) and table(topic).
      *
-     * @param topic           the topic name
-     * @param keySerde   key serde used to send key-value pairs,
-     *                        if not specified the default key serde defined in the configuration will be used
-     * @param valSerde   value serde used to send key-value pairs,
-     *                        if not specified the default value serde defined in the configuration will be used
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name
      * @return the new stream that consumes the given topic
      */
-    KTable<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde);
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
      * Sends key-value to a topic using default serializers specified in the config.
@@ -85,13 +85,13 @@ public interface KTable<K, V> {
     /**
      * Sends key-value to a topic.
      *
-     * @param topic    the topic name
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default serde defined in the configs will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default serde defined in the configs will be used
+     * @param topic     the topic name
      */
-    void to(String topic, Serde<K> keySerde, Serde<V> valSerde);
+    void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
      * Creates a new instance of KStream from this KTable
@@ -136,22 +136,38 @@ public interface KTable<K, V> {
     /**
      * Reduce values of this table by the selected key.
      *
-     * @param addReducer the class of Reducer
-     * @param removeReducer the class of Reducer
+     * @param adder the class of Reducer
+     * @param subtractor the class of Reducer
      * @param selector the KeyValue mapper that select the aggregate key
      * @param name the name of the resulted table
      * @param <K1>   the key type of the aggregated table
      * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
-                                   Reducer<V1> removeReducer,
+    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
+                                   Reducer<V1> subtractor,
                                    KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                    Serde<K1> keySerde,
                                    Serde<V1> valueSerde,
                                    String name);
 
     /**
+     * Reduce values of this table by the selected key.
+     *
+     * @param adder         the instance of {@link Reducer} for addition
+     * @param subtractor    the instance of {@link Reducer} for subtraction
+     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param name          the name of the resulted table
+     * @param <K1>          the key type of the aggregated table
+     * @param <V1>          the value type of the aggregated table
+     * @return              the instance of KTable
+     */
+    <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
+                                   Reducer<V1> subtractor,
+                                   KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                   String name);
+
+    /**
      * Aggregate values of this table by the selected key.
      *
      * @param initializer the class of Initializer
@@ -173,6 +189,24 @@ public interface KTable<K, V> {
                                         String name);
 
     /**
+     * Aggregate values of this table by the selected key.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param adder         the instance of {@link Aggregator} for addition
+     * @param substractor   the instance of {@link Aggregator} for subtraction
+     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param name          the name of the resulted table
+     * @param <K1>          the key type of the aggregated table
+     * @param <V1>          the value type of the aggregated table
+     * @return              the instance of aggregated {@link KTable}
+     */
+    <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+                                        Aggregator<K1, V1, T> adder,
+                                        Aggregator<K1, V1, T> substractor,
+                                        KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                        String name);
+
+    /**
      * Count number of records of this table by the selected key.
      *
      * @param selector the KeyValue mapper that select the aggregate key
@@ -184,4 +218,14 @@ public interface KTable<K, V> {
                                 Serde<K1> keySerde,
                                 Serde<V> valueSerde,
                                 String name);
+
+    /**
+     * Count number of records of this table by the selected key.
+     *
+     * @param selector      the instance of {@link KeyValueMapper} that select the aggregate key
+     * @param name          the name of the resulted table
+     * @param <K1>          the key type of the aggregated table
+     * @return              the instance of aggregated {@link KTable}
+     */
+    <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b293496..567b06c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,27 +194,25 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> through(String topic,
-                                 Serde<K> keySerde,
-                                 Serde<V> valSerde) {
-        to(topic, keySerde, valSerde);
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        to(keySerde, valSerde, topic);
 
-        return topology.stream(keySerde, valSerde);
+        return topology.stream(keySerde, valSerde, topic);
     }
 
     @Override
     public KStream<K, V> through(String topic) {
-        return through(topic, null, null);
+        return through(null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(topic, null, null);
+        to(null, null, topic);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) {
+    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         String name = topology.newName(SINK_NAME);
         StreamPartitioner<K, V> streamPartitioner = null;
 
@@ -270,6 +268,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public <V1, R> KStream<K, R> join(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows) {
+
+        return join(other, joiner, windows, null, null, null, false);
+    }
+
+    @Override
     public <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> other,
             ValueJoiner<V, V1, R> joiner,
@@ -281,6 +288,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
     }
 
+    @Override
+    public <V1, R> KStream<K, R> outerJoin(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows) {
+
+        return join(other, joiner, windows, null, null, null, true);
+    }
+
     @SuppressWarnings("unchecked")
     private <V1, R> KStream<K, R> join(
             KStream<K, V1> other,
@@ -363,6 +379,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
     }
 
+    @Override
+    public <V1, R> KStream<K, R> leftJoin(
+            KStream<K, V1> other,
+            ValueJoiner<V, V1, R> joiner,
+            JoinWindows windows) {
+
+        return leftJoin(other, joiner, windows, null, null);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
@@ -402,6 +427,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+                                                                 Windows<W> windows) {
+
+        return reduceByKey(reducer, windows, null, null);
+    }
+
+    @Override
     public KTable<K, V> reduceByKey(Reducer<V> reducer,
                                     Serde<K> keySerde,
                                     Serde<V> aggValueSerde,
@@ -426,6 +458,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
+
+        return reduceByKey(reducer, null, null, name);
+    }
+
+    @Override
     public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
                                                                        Aggregator<K, V, T> aggregator,
                                                                        Windows<W> windows,
@@ -452,6 +490,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
+                                                                       Aggregator<K, V, T> aggregator,
+                                                                       Windows<W> windows) {
+
+        return aggregateByKey(initializer, aggregator, windows, null, null);
+    }
+
+    @Override
     public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
                                            Aggregator<K, V, T> aggregator,
                                            Serde<K> keySerde,
@@ -477,6 +523,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
+    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
+                                           Aggregator<K, V, T> aggregator,
+                                           String name) {
+
+        return aggregateByKey(initializer, aggregator, null, null, name);
+    }
+
+    @Override
     public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
                                                                    Serde<K> keySerde) {
         return this.aggregateByKey(
@@ -495,8 +549,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public     KTable<K, Long> countByKey(Serde<K> keySerde,
-                                          String name) {
+    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) {
+        return countByKey(windows, null);
+    }
+
+    @Override
+    public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
         return this.aggregateByKey(
                 new Initializer<Long>() {
                     @Override
@@ -511,4 +569,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                     }
                 }, keySerde, Serdes.Long(), name);
     }
+
+    @Override
+    public KTable<K, Long> countByKey(String name) {
+        return countByKey(null, name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 496a476..ca1e659 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -131,27 +131,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public KTable<K, V> through(String topic,
-                                Serde<K> keySerde,
-                                Serde<V> valSerde) {
-        to(topic, keySerde, valSerde);
+    public KTable<K, V> through(Serde<K> keySerde,
+                                Serde<V> valSerde,
+                                String topic) {
+        to(keySerde, valSerde, topic);
 
         return topology.table(keySerde, valSerde, topic);
     }
 
     @Override
     public KTable<K, V> through(String topic) {
-        return through(topic, null, null);
+        return through(null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(topic, null, null);
+        to(null, null, topic);
     }
 
     @Override
-    public void to(String topic, Serde<K> keySerde, Serde<V> valSerde) {
-        this.toStream().to(topic, keySerde, valSerde);
+    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        this.toStream().to(keySerde, valSerde, topic);
     }
 
     @Override
@@ -239,8 +239,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
-                                               Aggregator<K1, V1, T> add,
-                                               Aggregator<K1, V1, T> remove,
+                                               Aggregator<K1, V1, T> adder,
+                                               Aggregator<K1, V1, T> subtractor,
                                                KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                                Serde<K1> keySerde,
                                                Serde<V1> valueSerde,
@@ -259,7 +259,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, add, remove);
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerde)
@@ -287,6 +287,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
+    public <K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
+                                               Aggregator<K1, V1, T> adder,
+                                               Aggregator<K1, V1, T> substractor,
+                                               KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                               String name) {
+
+        return aggregate(initializer, adder, substractor, selector, null, null, null, name);
+    }
+
+    @Override
     public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector,
                                        Serde<K1> keySerde,
                                        Serde<V> valueSerde,
@@ -318,8 +328,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     }
 
     @Override
-    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
-                                          Reducer<V1> removeReducer,
+    public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> selector, String name) {
+        return count(selector, null, null, name);
+    }
+
+    @Override
+    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
+                                          Reducer<V1> subtractor,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                           Serde<K1> keySerde,
                                           Serde<V1> valueSerde,
@@ -337,7 +352,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
 
-        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer);
+        ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
 
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerde)
@@ -364,6 +379,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
     }
 
+    @Override
+    public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> adder,
+                                          Reducer<V1> subtractor,
+                                          KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+                                          String name) {
+
+        return reduce(adder, subtractor, selector, null, null, name);
+    }
+
     @SuppressWarnings("unchecked")
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index af3c0d7..ff118da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
 import org.apache.kafka.streams.state.internals.RocksDBStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A KTable storage. It stores all entries in a local RocksDB database.
@@ -34,15 +33,17 @@ import org.apache.kafka.streams.state.StateSerdes;
 public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final Time time;
 
     protected KTableStoreSupplier(String name,
                                   Serde<K> keySerde,
-                                  Serde<V> valSerde,
+                                  Serde<V> valueSerde,
                                   Time time) {
         this.name = name;
-        this.serdes = new StateSerdes<>(name, keySerde, valSerde);
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.time = time;
     }
 
@@ -51,7 +52,7 @@ public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), "rocksdb-state", time);
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde), "rocksdb-state", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
new file mode 100644
index 0000000..61b1c98
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
+ *
+ * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ *
+ * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
+ * <i>event-time</i> semantics.
+ *
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ */
+public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return record.timestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
new file mode 100644
index 0000000..81821ce
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}.
+ *
+ * Using this extractor effectively provides <i>processing-time</i> semantics.
+ *
+ * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
+ * built-in <i>CreateTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
+ */
+public class WallclockTimestampExtractor implements TimestampExtractor {
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
deleted file mode 100644
index 60b3b96..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class WallclockTimestampExtractor implements TimestampExtractor {
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return System.currentTimeMillis();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 1a41a16..9daac98 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -33,7 +33,7 @@ public final class StateSerdes<K, V> {
         return new StateSerdes<>(topic, Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
     }
 
-    private final String topic;
+    private final String stateName;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
 
@@ -43,15 +43,15 @@ public final class StateSerdes<K, V> {
      * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
      * need to provide the topic name any more.
      *
-     * @param topic the name of the topic
-     * @param keySerde the serde for keys; cannot be null
-     * @param valueSerde the serde for values; cannot be null
+     * @param stateName     the name of the state
+     * @param keySerde      the serde for keys; cannot be null
+     * @param valueSerde    the serde for values; cannot be null
      */
     @SuppressWarnings("unchecked")
-    public StateSerdes(String topic,
+    public StateSerdes(String stateName,
                        Serde<K> keySerde,
                        Serde<V> valueSerde) {
-        this.topic = topic;
+        this.stateName = stateName;
 
         if (keySerde == null)
             throw new IllegalArgumentException("key serde cannot be null");
@@ -87,22 +87,22 @@ public final class StateSerdes<K, V> {
     }
 
     public String topic() {
-        return topic;
+        return stateName;
     }
 
     public K keyFrom(byte[] rawKey) {
-        return keySerde.deserializer().deserialize(topic, rawKey);
+        return keySerde.deserializer().deserialize(stateName, rawKey);
     }
 
     public V valueFrom(byte[] rawValue) {
-        return valueSerde.deserializer().deserialize(topic, rawValue);
+        return valueSerde.deserializer().deserialize(stateName, rawValue);
     }
 
     public byte[] rawKey(K key) {
-        return keySerde.serializer().serialize(topic, key);
+        return keySerde.serializer().serialize(stateName, key);
     }
 
     public byte[] rawValue(V value) {
-        return valueSerde.serializer().serialize(topic, value);
+        return valueSerde.serializer().serialize(stateName, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 33df13f..4e28187 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -44,8 +44,6 @@ public class Stores {
                 return new ValueFactory<K>() {
                     @Override
                     public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) {
-                        final StateSerdes<K, V> serdes =
-                                new StateSerdes<>(name, keySerde, valueSerde);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
@@ -62,9 +60,9 @@ public class Stores {
                                     @Override
                                     public StateStoreSupplier build() {
                                         if (capacity < Integer.MAX_VALUE) {
-                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null);
+                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde);
                                         }
-                                        return new InMemoryKeyValueStoreSupplier<>(name, serdes, null);
+                                        return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde);
                                     }
                                 };
                             }
@@ -88,10 +86,10 @@ public class Stores {
                                     @Override
                                     public StateStoreSupplier build() {
                                         if (numSegments > 0) {
-                                            return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, serdes, null);
+                                            return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
                                         }
 
-                                        return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
+                                        return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde);
                                     }
                                 };
                             }
@@ -170,8 +168,8 @@ public class Stores {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
          *
-         * @param keySerde the serialization factory for keys; may not be null
-         * @return the interface used to specify the type of values; never null
+         * @param keySerde  the serialization factory for keys; may be null
+         * @return          the interface used to specify the type of values; never null
          */
         public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
     }
@@ -249,8 +247,8 @@ public class Stores {
         /**
          * Use the specified serializer and deserializer for the values.
          *
-         * @param valueSerde the serialization factory for values; may not be null
-         * @return the interface used to specify the remaining key-value store options; never null
+         * @param valueSerde    the serialization factory for values; may be null
+         * @return              the interface used to specify the remaining key-value store options; never null
          */
         public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
index 66e1338..fdf3269 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -19,13 +19,22 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+
 import java.nio.ByteBuffer;
 
 public class WindowStoreUtils {
 
-    public static final int TIMESTAMP_SIZE = 8;
-    public static final int SEQNUM_SIZE = 4;
-    public static final StateSerdes<byte[], byte[]> INNER_SERDES = StateSerdes.withBuiltinTypes("", byte[].class, byte[].class);
+    private static final int SEQNUM_SIZE = 4;
+    private static final int TIMESTAMP_SIZE = 8;
+
+    /** Inner byte array serde used for segments */
+    public static final Serde<byte[]> INNER_SERDE = Serdes.ByteArray();
+
+    /** Inner byte array state serde used for segments */
+    public static final StateSerdes<byte[], byte[]> INNER_SERDES = new StateSerdes<>("", INNER_SERDE, INNER_SERDE);
+
     @SuppressWarnings("unchecked")
     public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 32116dd..efcdac7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -29,16 +31,19 @@ import java.util.List;
 public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
 
     private final KeyValueStore<K, V> inner;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final String storeName;
 
+    private StateSerdes<K, V> serdes;
     private StoreChangeLogger<K, V> changeLogger;
     private StoreChangeLogger.ValueGetter<K, V> getter;
 
-    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, final StateSerdes<K, V> serdes) {
+    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
         this.storeName = storeName;
         this.inner = inner;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
     }
 
     @Override
@@ -47,9 +52,24 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
+        // construct the serde
+        this.serdes = new StateSerdes<>(storeName,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
+        context.register(root, true, new StateRestoreCallback() {
+            @Override
+            public void restore(byte[] key, byte[] value) {
+
+                // directly call inner functions so that the operation is not logged
+                inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
+            }
+        });
+
         inner.init(context, root);
 
         this.getter = new StoreChangeLogger.ValueGetter<K, V>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 4054d68..3a5819c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -17,15 +17,14 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Iterator;
 import java.util.List;
@@ -45,12 +44,18 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
     private final Time time;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
 
-    public InMemoryKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) {
+    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(name, keySerde, valueSerde, null);
+    }
+
+    public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) {
         this.name = name;
         this.time = time;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
     }
 
     public String name() {
@@ -58,28 +63,24 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name).enableLogging(serdes), "in-memory-state", time);
+        return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
         private final String name;
+        private final Serde<K> keySerde;
+        private final Serde<V> valueSerde;
         private final NavigableMap<K, V> map;
 
-        private boolean loggingEnabled = false;
-        private StateSerdes<K, V> serdes = null;
-
-        public MemoryStore(String name) {
-            super();
+        public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
             this.name = name;
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
             this.map = new TreeMap<>();
         }
 
-        public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) {
-            this.loggingEnabled = true;
-            this.serdes = serdes;
-
-            return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+        public KeyValueStore<K, V> enableLogging() {
+            return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
         }
 
         @Override
@@ -88,17 +89,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
+        @SuppressWarnings("unchecked")
         public void init(ProcessorContext context, StateStore root) {
-            if (loggingEnabled) {
-                context.register(root, true, new StateRestoreCallback() {
-
-                    @Override
-                    public void restore(byte[] key, byte[] value) {
-                        put(serdes.keyFrom(key), serdes.valueFrom(value));
-                    }
-                });
-
-            }
+            // do nothing
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 1c2241f..4a4fa5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
@@ -32,13 +32,19 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
     private final int capacity;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final Time time;
 
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, StateSerdes<K, V> serdes, Time time) {
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(name, capacity, keySerde, valueSerde, null);
+    }
+
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time) {
         this.name = name;
         this.capacity = capacity;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.time = time;
     }
 
@@ -49,7 +55,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
     @SuppressWarnings("unchecked")
     public StateStore get() {
         final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
-        final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(serdes);
+        final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(keySerde, valueSerde);
         final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
         cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index a5aaa06..a859bd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -16,13 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -42,9 +41,6 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     protected EldestEntryRemovalListener<K, V> listener;
 
-    private boolean loggingEnabled = false;
-    private StateSerdes<K, V> serdes = null;
-
     // this is used for extended MemoryNavigableLRUCache only
     public MemoryLRUCache() {}
 
@@ -69,11 +65,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         };
     }
 
-    public KeyValueStore<K, V> enableLogging(StateSerdes<K, V> serdes) {
-        this.loggingEnabled = true;
-        this.serdes = serdes;
-
-        return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+    public KeyValueStore<K, V> enableLogging(Serde<K> keySerde, Serde<V> valueSerde) {
+        return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
     }
 
     public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
@@ -88,17 +81,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        if (loggingEnabled) {
-            context.register(root, true, new StateRestoreCallback() {
-
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    put(serdes.keyFrom(key), serdes.valueFrom(value));
-                }
-            });
-
-        }
+        // do nothing
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index ec10c3f..af98733 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -17,10 +17,10 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
@@ -33,12 +33,18 @@ import org.apache.kafka.streams.state.StateSerdes;
 public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final Time time;
 
-    public RocksDBKeyValueStoreSupplier(String name, StateSerdes<K, V> serdes, Time time) {
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(name, keySerde, valueSerde, null);
+    }
+
+    public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time) {
         this.name = name;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.time = time;
     }
 
@@ -47,6 +53,6 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes).enableLogging(), "rocksdb-state", time);
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde).enableLogging(), "rocksdb-state", time);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3045856..b206f37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -67,7 +68,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private final WriteOptions wOptions;
     private final FlushOptions fOptions;
 
-    private ProcessorContext context;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+
     private StateSerdes<K, V> serdes;
     protected File dbDir;
     private RocksDB db;
@@ -92,14 +95,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         return this;
     }
 
-    public RocksDBStore(String name, StateSerdes<K, V> serdes) {
-        this(name, DB_FILE_DIR, serdes);
+    public RocksDBStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(name, DB_FILE_DIR, keySerde, valueSerde);
     }
 
-    public RocksDBStore(String name, String parentDir, StateSerdes<K, V> serdes) {
+    public RocksDBStore(String name, String parentDir, Serde<K> keySerde, Serde<V> valueSerde) {
         this.name = name;
         this.parentDir = parentDir;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
 
         // initialize the rocksdb options
         BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
@@ -136,15 +140,20 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
-        this.context = context;
-        this.dbDir = new File(new File(this.context.stateDir(), parentDir), this.name);
+        // we need to construct the serde while opening DB since
+        // it is also triggered by windowed DB segments without initialization
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
     }
 
-    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        // first open the DB dir
+        // open the DB dir
         openDB(context);
 
         this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 61c2e5e..4c6a229 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -19,6 +19,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -51,7 +52,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         public final long id;
 
         Segment(String segmentName, String windowName, long id) {
-            super(segmentName, windowName, WindowStoreUtils.INNER_SERDES);
+            super(segmentName, windowName, WindowStoreUtils.INNER_SERDE, WindowStoreUtils.INNER_SERDE);
             this.id = id;
         }
 
@@ -114,7 +115,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final long segmentInterval;
     private final boolean retainDuplicates;
     private final Segment[] segments;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final SimpleDateFormat formatter;
     private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
 
@@ -122,17 +124,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private int seqnum = 0;
     private long currentSegmentId = -1L;
 
+    private StateSerdes<K, V> serdes;
+
     private boolean loggingEnabled = false;
     private StoreChangeLogger<byte[], byte[]> changeLogger = null;
 
-    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes) {
+    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
         this.name = name;
 
         // The segment interval must be greater than MIN_SEGMENT_INTERVAL
         this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
 
         this.segments = new Segment[numSegments];
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
 
         this.retainDuplicates = retainDuplicates;
 
@@ -159,13 +164,18 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         this.context = context;
 
+        // construct the serde
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
         openExistingSegments();
 
-        this.changeLogger = this.loggingEnabled ?
-                new RawStoreChangeLogger(name, context) : null;
+        this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
 
         // register and possibly restore the state from the logs
         context.register(root, loggingEnabled, new StateRestoreCallback() {
@@ -202,7 +212,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 dir.mkdir();
             }
         } catch (Exception ex) {
-
+            // ignore
         }
     }
 


Mime
View raw message