kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-5922: Add SessionWindowedKStream
Date Thu, 21 Sep 2017 08:10:39 GMT
KAFKA-5922: Add SessionWindowedKStream

Add `SessionWindowedKStream` and implementation. Deprecate existing `SessionWindow` `aggregate` methods on `KGroupedStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3902 from dguy/kafka-5922


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2da064c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2da064c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2da064c

Branch: refs/heads/trunk
Commit: a2da064cbf01558d0af64adc9d6fc9444cd744ec
Parents: b12ba24
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Sep 21 16:10:17 2017 +0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Sep 21 16:10:17 2017 +0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |  41 ++-
 .../streams/kstream/SessionWindowedKStream.java | 268 ++++++++++++++++++
 .../streams/kstream/TimeWindowedKStream.java    | 276 +++++++++++++++++++
 .../kafka/streams/kstream/WindowedKStream.java  | 276 -------------------
 .../kstream/internals/KGroupedStreamImpl.java   |  49 ++--
 .../internals/SessionWindowedKStreamImpl.java   | 199 +++++++++++++
 .../internals/TimeWindowedKStreamImpl.java      | 179 ++++++++++++
 .../kstream/internals/WindowedKStreamImpl.java  | 179 ------------
 .../KStreamAggregationIntegrationTest.java      |   2 +-
 .../internals/KGroupedStreamImplTest.java       |   1 -
 .../SessionWindowedKStreamImplTest.java         | 264 ++++++++++++++++++
 .../internals/TimeWindowedKStreamImplTest.java  | 241 ++++++++++++++++
 .../internals/WindowedKStreamImplTest.java      | 241 ----------------
 13 files changed, 1498 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 5621ab4..1ff1759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -84,7 +84,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)
      */
+    @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
 
     /**
@@ -143,7 +145,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)}
      */
+    @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -222,8 +226,10 @@ public interface KGroupedStream<K, V> {
      * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
-     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window.
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                        final String queryableStoreName);
 
@@ -298,7 +304,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                        final StateStoreSupplier<WindowStore> storeSupplier);
 
@@ -337,7 +345,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
 
     /**
@@ -359,7 +369,9 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
 
     /**
@@ -395,7 +407,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                     final StateStoreSupplier<SessionStore> storeSupplier);
 
@@ -842,7 +856,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
                                   final String queryableStoreName);
@@ -875,7 +891,9 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows);
 
@@ -939,7 +957,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
                                   final StateStoreSupplier<SessionStore> storeSupplier);
@@ -1433,7 +1453,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1476,7 +1498,9 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1533,7 +1557,9 @@ public interface KGroupedStream<K, V> {
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(SessionWindows)}
      */
+    @Deprecated
     <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                          final Aggregator<? super K, ? super V, T> aggregator,
                                          final Merger<? super K, T> sessionMerger,
@@ -1542,11 +1568,18 @@ public interface KGroupedStream<K, V> {
                                          final StateStoreSupplier<SessionStore> storeSupplier);
 
     /**
-     * Create a new {@link WindowedKStream} instance that can be used to perform windowed aggregations.
+     * Create a new {@link TimeWindowedKStream} instance that can be used to perform windowed aggregations.
      * @param windows the specification of the aggregation {@link Windows}
      * @param <W>     the window type
-     * @return an instance of {@link WindowedKStream}
+     * @return an instance of {@link TimeWindowedKStream}
+     */
+    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
+
+    /**
+     * Create a new {@link SessionWindowedKStream} instance that can be used to perform session windowed aggregations.
+     * @param windows the specification of the aggregation {@link SessionWindows}
+     * @return an instance of {@link TimeWindowedKStream}
      */
-    <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows);
+    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
new file mode 100644
index 0000000..d8044ac
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.SessionStore;
+
+/**
+ * {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * {@link SessionWindows} are dynamic data driven windows.
+ * They have no fixed time boundaries, rather the size of the window is determined by the records.
+ * Please see {@link SessionWindows} for more details.
+ * <p>
+ * {@link SessionWindows} are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ *
+ * Furthermore, updates are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+ * <p>
+ * A {@code SessionWindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(SessionWindows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ * @see SessionWindows
+ */
+public interface SessionWindowedKStream<K, V> {
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     *
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    KTable<Windowed<K>, Long> count();
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the name provided with {@link Materialized}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+     * they are merged into a single session and the old sessions are discarded.
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+     * aggregate functions like count (c.f. {@link #count()})
+     * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Merger, Materialized)}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                         final Aggregator<? super K, ? super V, T> aggregator,
+                                         final Merger<? super K, T> sessionMerger);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the name provided with {@link Materialized}.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via
+     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * * The specified {@link Merger} is used to merge 2 existing sessions into one, i.e., when the windows overlap,
+     * they are merged into a single session and the old sessions are discarded.
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger)} can be used to compute
+     * aggregate functions like count (c.f. {@link #count()})
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param materialized   an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @param <VR>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator,
+                                           final Merger<? super K, VR> sessionMerger,
+                                           final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view).
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer)} can be used to compute aggregate functions like sum, min,
+     * or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     *
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating materialized view)
+     * provided by the given {@link Materialized} instance.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like
+     * sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
+     * alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @param materialized     an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
new file mode 100644
index 0000000..433f4e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
+ * {@link KStream} records.
+ * <p>
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
+ * <p>
+ * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+ * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+
+ * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ */
+public interface TimeWindowedKStream<K, V> {
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count();
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                           final Aggregator<? super K, ? super V, VR> aggregator,
+                                           final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     *
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
deleted file mode 100644
index 35e0eeb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.WindowStore;
-
-/**
- * {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
- * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
- * {@link KStream} records.
- * <p>
- * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
- * new (partitioned) windows resulting in a windowed {@link KTable}
- * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
- * <p>
- * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
- * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
- * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
- * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
- * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
- * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
- * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
-
- * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
- *
- * @param <K> Type of keys
- * @param <V> Type of values
- * @see KStream
- * @see KGroupedStream
- */
-public interface WindowedKStream<K, V> {
-
-    /**
-     * Count the number of records in this stream by the grouped key and the defined windows.
-     * Records with {@code null} key or value are ignored.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same window and key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
-     * represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<Windowed<K>, Long> count();
-
-    /**
-     * Count the number of records in this stream by the grouped key and the defined windows.
-     * Records with {@code null} key or value are ignored.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     *
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
-     * query the value of the key on a parallel running instance of your Kafka Streams application.
-     *
-     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
-     * represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized);
-
-    /**
-     * Aggregate the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
-     * allows the result to have a different type than the input values.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Initializer} is applied once directly before the first input record is processed to
-     * provide an initial intermediate aggregation result that is used to process the first record.
-     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
-     * aggregate (or for the very first record using the intermediate aggregation result provided via the
-     * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count()}).
-     * <p>
-     * The default value serde from config will be used for serializing the result.
-     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * Note that the internal store name may not be queriable through Interactive Queries.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     *
-     * @param <VR>          the value type of the resulting {@link KTable}
-     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
-     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                           final Aggregator<? super K, ? super V, VR> aggregator);
-
-    /**
-     * Aggregate the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example,
-     * allows the result to have a different type than the input values.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the store name as provided with {@link Materialized}.
-     * <p>
-     * The specified {@link Initializer} is applied once directly before the first input record is processed to
-     * provide an initial intermediate aggregation result that is used to process the first record.
-     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
-     * aggregate (or for the very first record using the intermediate aggregation result provided via the
-     * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute aggregate functions like
-     * count (c.f. {@link #count()}).
-     * <p>
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     *
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     *
-     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
-     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
-     * @param <VR>          the value type of the resulting {@link KTable}
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
-                                           final Aggregator<? super K, ? super V, VR> aggregator,
-                                           final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
-
-    /**
-     * Combine the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Combining implies that the type of the aggregate result is the same as the type of the input value.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
-     * aggregate and the record's value.
-     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
-     * value as-is.
-     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
-     * the same key.
-     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * <p>
-     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
-     * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
-     * and "-changelog" is a fixed suffix.
-     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
-
-    /**
-     * Combine the values of records in this stream by the grouped key.
-     * Records with {@code null} key or value are ignored.
-     * Combining implies that the type of the aggregate result is the same as the type of the input value.
-     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the store name as provided with {@link Materialized}.
-     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
-     * <p>
-     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
-     * aggregate and the record's value.
-     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
-     * value as-is.
-     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
-     * <p>
-     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate consecutive updates to
-     * the same window and key if caching is enabled on the {@link Materialized} instance.
-     * When caching is enable the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
-     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
-     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
-     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
-     * <p>
-     * To query the local windowed {@link KeyValueStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
-     * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
-     *
-     * String key = "some-word";
-     * long fromTime = ...;
-     * long toTime = ...;
-     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
-     * }</pre>
-     *
-     *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
-     * latest (rolling) aggregate for each key
-     */
-    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
-                                  final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index ba037f5..4943314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -26,10 +26,11 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -315,7 +316,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                                 final Merger<? super K, T> sessionMerger,
                                                 final SessionWindows sessionWindows,
                                                 final Serde<T> aggValueSerde) {
-        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, (String) null);
+        return windowedBy(sessionWindows).aggregate(initializer,
+                                                    aggregator,
+                                                    sessionMerger,
+                                                    Materialized.<K, T, SessionStore<Bytes, byte[]>>as(builder.newStoreName(AGGREGATE_NAME))
+                                                            .withKeySerde(keySerde)
+                                                            .withValueSerde(aggValueSerde));
     }
 
     @SuppressWarnings("unchecked")
@@ -340,26 +346,37 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
-    public <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W> windows) {
-        return new WindowedKStreamImpl<>(windows,
-                                         builder,
-                                         sourceNodes,
-                                         name,
-                                         keySerde,
-                                         valSerde,
-                                         repartitionRequired);
+    public <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows) {
+        return new TimeWindowedKStreamImpl<>(windows,
+                                             builder,
+                                             sourceNodes,
+                                             name,
+                                             keySerde,
+                                             valSerde,
+                                             repartitionRequired);
+    }
+
+    @Override
+    public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
+        return new SessionWindowedKStreamImpl<>(windows,
+                                                builder,
+                                                sourceNodes,
+                                                name,
+                                                keySerde,
+                                                valSerde,
+                                                aggregateBuilder);
     }
 
     @SuppressWarnings("unchecked")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
-        determineIsQueryable(queryableStoreName);
-        return count(sessionWindows,
-                     storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
-                             .sessionWindowed(sessionWindows.maintainMs()).build());
+        Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized = Materialized.<K, Long, SessionStore<Bytes, byte[]>>as(getOrCreateName(queryableStoreName, AGGREGATE_NAME))
+                .withKeySerde(keySerde)
+                .withValueSerde(Serdes.Long());
+        return windowedBy(sessionWindows).count(materialized);
     }
 
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows) {
-        return count(sessionWindows, (String) null);
+        return windowedBy(sessionWindows).count();
     }
 
     @Override
@@ -399,7 +416,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {
 
-        return reduce(reducer, sessionWindows, (String) null);
+        return windowedBy(sessionWindows).reduce(reducer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2da064c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
new file mode 100644
index 0000000..0603853
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.SessionWindowedKStream;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
+    private final SessionWindows windows;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
+    private final Merger<K, Long> countMerger = new Merger<K, Long>() {
+        @Override
+        public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
+            return aggOne + aggTwo;
+        }
+    };
+    private final Initializer<V> reduceInitializer = new Initializer<V>() {
+        @Override
+        public V apply() {
+            return null;
+        }
+    };
+
+
+    SessionWindowedKStreamImpl(final SessionWindows windows,
+                               final InternalStreamsBuilder builder,
+                               final Set<String> sourceNodes,
+                               final String name,
+                               final Serde<K> keySerde,
+                               final Serde<V> valSerde,
+                               final GroupedStreamAggregateBuilder<K, V> aggregateBuilder) {
+        super(builder, name, sourceNodes);
+        this.windows = windows;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.aggregateBuilder = aggregateBuilder;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return doAggregate(aggregateBuilder.countInitializer,
+                           aggregateBuilder.countAggregator,
+                           countMerger,
+                           Serdes.Long());
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregate(aggregateBuilder.countInitializer,
+                         aggregateBuilder.countAggregator,
+                         countMerger,
+                         materialized);
+    }
+
+    @Override
+    public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                final Aggregator<? super K, ? super V, T> aggregator,
+                                                final Merger<? super K, T> sessionMerger) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        return doAggregate(initializer, aggregator, sessionMerger, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                  final Aggregator<? super K, ? super V, VR> aggregator,
+                                                  final Merger<? super K, VR> sessionMerger,
+                                                  final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
+                new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
+                AGGREGATE_NAME,
+                materialize(materializedInternal),
+                true);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        return doAggregate(reduceInitializer, aggregatorForReducer(reducer), mergerForAggregator(aggregatorForReducer(reducer)), valSerde);
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final Aggregator<K, V, V> reduceAggregator = aggregatorForReducer(reducer);
+        return aggregate(reduceInitializer, reduceAggregator, mergerForAggregator(reduceAggregator), materialized);
+    }
+
+
+    private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
+        SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentSessionStore(materialized.storeName(),
+                                                     windows.maintainMs());
+        }
+        final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(supplier,
+                                                                                     materialized.keySerde(),
+                                                                                     materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
+    }
+
+    private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) {
+        return new Merger<K, V>() {
+            @Override
+            public V apply(final K aggKey, final V aggOne, final V aggTwo) {
+                return aggregator.apply(aggKey, aggTwo, aggOne);
+            }
+        };
+    }
+
+    private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
+        return new Aggregator<K, V, V>() {
+            @Override
+            public V apply(final K aggKey, final V value, final V aggregate) {
+                if (aggregate == null) {
+                    return value;
+                }
+                return reducer.apply(aggregate, value);
+            }
+        };
+    }
+
+    private <VR> StoreBuilder<SessionStore<K, VR>> storeBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+        return Stores.sessionStoreBuilder(
+                Stores.persistentSessionStore(
+                        storeName,
+                        windows.maintainMs()),
+                keySerde,
+                aggValueSerde).withCachingEnabled();
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR> initializer,
+                                                     final Aggregator<? super K, ? super V, VR> aggregator,
+                                                     final Merger<? super K, VR> merger,
+                                                     final Serde<VR> serde) {
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamSessionWindowAggregate<>(windows, storeName, initializer, aggregator, merger),
+                                                                AGGREGATE_NAME,
+                                                                storeBuilder(storeName, serde),
+                                                                false);
+    }
+}


Mime
View raw message