kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/5] kafka git commit: KAFKA-3452: Support session windows
Date Fri, 06 Jan 2017 18:12:38 GMT
KAFKA-3452: Support session windows

Add support for SessionWindows based on design detailed in https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows.
This includes refactoring of the RocksDBWindowStore such that functionality common with the RocksDBSessionStore isn't duplicated.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #2166 from dguy/kafka-3452-session-merge


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0de3a42
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0de3a42
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0de3a42

Branch: refs/heads/trunk
Commit: e0de3a4211a3701c98230b115fadfb67b655e3cf
Parents: 00964ec
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Jan 6 10:12:30 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jan 6 10:12:30 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   | 267 +++++++++++++
 .../apache/kafka/streams/kstream/Merger.java    |  40 ++
 .../kafka/streams/kstream/SessionWindows.java   | 105 ++++++
 .../apache/kafka/streams/kstream/Window.java    |   7 +
 .../apache/kafka/streams/kstream/Windows.java   |   2 +-
 .../kafka/streams/kstream/internals/Change.java |  16 +
 .../kstream/internals/KGroupedStreamImpl.java   | 153 +++++++-
 .../kstream/internals/KStreamAggregate.java     |   4 +-
 .../kstream/internals/KStreamReduce.java        |   4 +-
 .../KStreamSessionWindowAggregate.java          | 165 ++++++++
 .../internals/KStreamWindowAggregate.java       |   6 +-
 .../kstream/internals/KStreamWindowReduce.java  |   6 +-
 .../kstream/internals/KTableAggregate.java      |   4 +-
 .../streams/kstream/internals/KTableReduce.java |   4 +-
 .../streams/kstream/internals/KTableSource.java |   4 +-
 .../kstream/internals/SessionKeySerde.java      | 149 ++++++++
 .../kstream/internals/TupleForwarder.java       |  21 +-
 .../streams/processor/TopologyBuilder.java      |   6 +-
 .../kafka/streams/state/KeyValueIterator.java   |   6 +
 .../streams/state/QueryableStoreTypes.java      |  20 +
 .../streams/state/ReadOnlySessionStore.java     |  40 ++
 .../kafka/streams/state/SessionStore.java       |  47 +++
 .../org/apache/kafka/streams/state/Stores.java  |  23 +-
 .../state/internals/CachingKeyValueStore.java   |   7 +-
 .../state/internals/CachingSessionStore.java    | 242 ++++++++++++
 .../state/internals/CachingWindowStore.java     |  28 +-
 .../ChangeLoggingSegmentedBytesStore.java       |  95 +++++
 .../CompositeReadOnlyKeyValueStore.java         |   9 +-
 .../CompositeReadOnlySessionStore.java          |  89 +++++
 .../DelegatingPeekingKeyValueIterator.java      |  20 +-
 .../DelegatingPeekingWindowIterator.java        |  73 ----
 .../state/internals/HasNextCondition.java       |  24 ++
 .../InMemoryKeyValueStoreSupplier.java          |   9 +-
 .../streams/state/internals/LRUCacheEntry.java  |   2 +-
 .../internals/MemoryNavigableLRUCache.java      |   7 +-
 .../MergedSortedCacheKeyValueStoreIterator.java |  93 +++--
 .../MergedSortedCachedWindowStoreIterator.java  |  22 +-
 .../state/internals/MeteredKeyValueStore.java   |   5 +
 .../internals/MeteredSegmentedBytesStore.java   | 177 +++++++++
 .../streams/state/internals/NamedCache.java     |   8 +-
 .../internals/PeekingKeyValueIterator.java      |   3 +-
 .../state/internals/PeekingWindowIterator.java  |  25 --
 .../internals/RocksDBSegmentedBytesStore.java   | 134 +++++++
 .../state/internals/RocksDBSessionStore.java    | 143 +++++++
 .../internals/RocksDBSessionStoreSupplier.java  |  68 ++++
 .../streams/state/internals/RocksDBStore.java   |  17 +-
 .../state/internals/RocksDBWindowStore.java     | 374 +++----------------
 .../internals/RocksDBWindowStoreSupplier.java   |  19 +-
 .../kafka/streams/state/internals/Segment.java  |  42 +++
 .../state/internals/SegmentIterator.java        |  94 +++++
 .../state/internals/SegmentedBytesStore.java    | 122 ++++++
 .../kafka/streams/state/internals/Segments.java | 179 +++++++++
 .../state/internals/SessionKeySchema.java       |  73 ++++
 .../streams/state/internals/ThreadCache.java    |  15 +-
 .../state/internals/WindowStoreKeySchema.java   |  59 +++
 .../state/internals/WindowStoreSupplier.java    |  24 ++
 .../integration/JoinIntegrationTest.java        |   2 +-
 .../KStreamAggregationIntegrationTest.java      | 203 ++++++++++
 .../internals/KGroupedStreamImplTest.java       | 229 +++++++++++-
 ...reamSessionWindowAggregateProcessorTest.java | 286 ++++++++++++++
 .../kstream/internals/SessionKeySerdeTest.java  |  87 +++++
 .../internals/CachingSessionStoreTest.java      | 204 ++++++++++
 .../state/internals/CachingWindowStoreTest.java |  18 +-
 .../ChangeLoggingSegmentedBytesStoreTest.java   | 114 ++++++
 .../CompositeReadOnlySessionStoreTest.java      | 121 ++++++
 .../DelegatingPeekingKeyValueIteratorTest.java  |  11 +-
 .../DelegatingPeekingWindowIteratorTest.java    |  92 -----
 .../state/internals/InMemoryKeyValueStore.java  |   9 +-
 ...gedSortedCacheKeyValueStoreIteratorTest.java |  36 +-
 ...ergedSortedCacheWindowStoreIteratorTest.java |  44 +--
 .../MeteredSegmentedBytesStoreTest.java         | 122 ++++++
 .../streams/state/internals/NamedCacheTest.java |  17 +
 .../RocksDBSegmentedBytesStoreTest.java         | 171 +++++++++
 .../internals/RocksDBSessionStoreTest.java      | 156 ++++++++
 .../state/internals/RocksDBWindowStoreTest.java |  91 ++---
 .../state/internals/SegmentIteratorTest.java    | 148 ++++++++
 .../streams/state/internals/SegmentsTest.java   | 193 ++++++++++
 .../state/internals/ThreadCacheTest.java        |  17 +-
 .../apache/kafka/test/KeyValueIteratorStub.java |  56 +++
 .../apache/kafka/test/MockProcessorContext.java |   4 +
 .../kafka/test/ReadOnlySessionStoreStub.java    |  89 +++++
 .../kafka/test/SegmentedBytesStoreStub.java     |  95 +++++
 82 files changed, 5257 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 33a2791..fc2881a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.SessionStore;
 
 /**
  * {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs.
@@ -179,6 +180,69 @@ public interface KGroupedStream<K, V> {
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                        final StateStoreSupplier<WindowStore> storeSupplier);
 
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-word";
+     * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     *
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param storeName      the name of the state store created from this operation.
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     */
+    KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} value are ignored.
+     * The result is written into a local {@link SessionStore} provided by the given {@code storeSupplier}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-word";
+     * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     *
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param storeSupplier  user defined state store supplier {@link StateStoreSupplier}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     */
+    KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
+                                    final StateStoreSupplier<SessionStore> storeSupplier);
+
     /**
      * Combine the values of records in this stream by the grouped key.
      * Records with {@code null} value are ignored.
@@ -360,6 +424,107 @@ public interface KGroupedStream<K, V> {
                                                      final Windows<W> windows,
                                                      final StateStoreSupplier<WindowStore> storeSupplier);
 
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @param reducer           the instance of {@link Reducer}
+     * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
+     * @param storeName         the name of the state store created from this operation
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final SessionWindows sessionWindows,
+                                  final String storeName);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
+     * The result is written into a local {@link SessionStore} provided by the given {@code storeSupplier}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @param reducer           the instance of {@link Reducer}
+     * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
+     * @param storeSupplier     user defined state store supplier {@link StateStoreSupplier}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final SessionWindows sessionWindows,
+                                  final StateStoreSupplier<SessionStore> storeSupplier);
+
 
     /**
      * Aggregate the values of records in this stream by the grouped key.
@@ -569,4 +734,106 @@ public interface KGroupedStream<K, V> {
                                                              final Windows<W> windows,
                                                              final StateStoreSupplier<WindowStore> storeSupplier);
 
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String)} combining via
+     * reduce(...)} as it allows the result to have a different type than the input values.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)})
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting storeName = storeSupplier.name();
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @param storeName     the name of the state store created from this operation
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                         final Aggregator<? super K, ? super V, T> aggregator,
+                                         final Merger<? super K, T> sessionMerger,
+                                         final SessionWindows sessionWindows,
+                                         final Serde<T> aggValueSerde,
+                                         final String storeName);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String)} combining via
+     * reduce(...)} as it allows the result to have a different type than the input values.
+     * The result is written into a local {@link SessionStore} provided by the given {@code storeSupplier}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)})
+     * <p>
+     * To query the local {@link SessionStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double
+     * Sting storeName = storeSupplier.name();
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     *
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param aggValueSerde  aggregate value serdes for materializing the aggregated table,
+     *                       if not specified the default serdes defined in the configs will be used
+     * @param storeSupplier  user defined state store supplier {@link StateStoreSupplier}
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                         final Aggregator<? super K, ? super V, T> aggregator,
+                                         final Merger<? super K, T> sessionMerger,
+                                         final SessionWindows sessionWindows,
+                                         final Serde<T> aggValueSerde,
+                                         final StateStoreSupplier<SessionStore> storeSupplier);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
new file mode 100644
index 0000000..fcdb20f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Merger.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The interface for merging aggregate values for {@link SessionWindows} with the given key.
+ *
+ * @param <K>   key type
+ * @param <T>   aggregate value type
+ */
+@InterfaceStability.Unstable
+public interface Merger<K, V> {
+
+    /**
+     * Compute a new aggregate from the key and two aggregates
+     *
+     * @param aggKey    the key of the record
+     * @param aggOne    the first aggregate
+     * @param aggTwo    the second aggregate
+     * @return          the new aggregate value
+     */
+    V apply(K aggKey, V aggOne, V aggTwo);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
new file mode 100644
index 0000000..f9a399a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A session based window specification used for aggregating events into sessions.
+ * <p>
+ * Sessions represent a period of activity separated by a defined gap of inactivity.
+ * Any events processed that fall within the inactivity gap of any existing sessions
+ * are merged into the existing sessions. If the event falls outside of the session gap
+ * then a new session will be created.
+ * <p>
+ * For example, If we have a session gap of 5 and the following data arrives:
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     1       |     10     |
+ * +-----------+-------------+------------+
+ * |    A      |     2       |     12     |
+ * +-----------+-------------+------------+
+ * |    A      |     3       |     20     |
+ * +-----------+-------------+------------+
+ * </pre>
+ * <p>
+ * We'd have 2 sessions for key A. 1 starting from time 10 and ending at time 12 and another
+ * starting and ending at time 20. The length of the session is driven by the timestamps of
+ * the data within the session
+ * <p>
+ * If we then received another record:
+ * <p>
+ * <pre>
+ * +--------------------------------------+
+ * |    key    |    value    |    time    |
+ * +-----------+-------------+------------+
+ * |    A      |     4       |     16     |
+ * +-----------+-------------+------------+
+ * </pre>
+ * <p>
+ * The previous 2 sessions would be merged into a single session with start time 10 and end time 20.
+ * The aggregate value for this session would be the result of aggregating all 4 values.
+ */
+@InterfaceStability.Unstable
+public class SessionWindows {
+
+    private final long gapMs;
+    private long maintainDurationMs;
+
+    private SessionWindows(final long gapMs, final long maintainDurationMs) {
+        this.gapMs = gapMs;
+        this.maintainDurationMs = maintainDurationMs;
+    }
+
+    /**
+     * Create a new SessionWindows with the specified inactivity gap
+     * @param inactivityGapMs  the gap of inactivity between sessions
+     * @return a new SessionWindows with the provided inactivity gap
+     * and default maintain duration
+     */
+    public static SessionWindows with(final long inactivityGapMs) {
+        return new SessionWindows(inactivityGapMs, Windows.DEFAULT_MAINTAIN_DURATION);
+    }
+
+    /**
+     * Set the window maintain duration in milliseconds of streams time.
+     * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
+     *
+     * @return  itself
+     */
+    public SessionWindows until(final long durationMs) {
+        this.maintainDurationMs = durationMs;
+        return this;
+    }
+
+    /**
+     * @return the inactivityGap
+     */
+    public long inactivityGap() {
+        return gapMs;
+    }
+
+    /**
+     * @return the minimum amount of time a window will be maintained for.
+     */
+    public long maintainMs() {
+        return maintainDurationMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index e1ea9a0..7d78d74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -80,4 +80,11 @@ public abstract class Window {
         return (int) (n % 0xFFFFFFFFL);
     }
 
+    @Override
+    public String toString() {
+        return "Window{" +
+                "start=" + start +
+                ", end=" + end +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index dbc52a8..ebd92fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -28,7 +28,7 @@ public abstract class Windows<W extends Window> {
 
     private static final int DEFAULT_NUM_SEGMENTS = 3;
 
-    private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
+    static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
     private long maintainDurationMs;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
index d7b868e..0946b6b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Objects;
+
 public class Change<T> {
 
     public final T newValue;
@@ -31,4 +33,18 @@ public class Change<T> {
     public String toString() {
         return "(" + newValue + "<-" + oldValue + ")";
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final Change<?> change = (Change<?>) o;
+        return Objects.equals(newValue, change.newValue) &&
+                Objects.equals(oldValue, change.oldValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(newValue, oldValue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index e50b6dc..a4fc793 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
  * copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -16,24 +16,27 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.kstream.Initializer;
-import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.SessionStore;
 
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
-public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
+class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
 
     private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
     private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
@@ -42,12 +45,12 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     private final Serde<V> valSerde;
     private final boolean repartitionRequired;
 
-    public KGroupedStreamImpl(final KStreamBuilder topology,
-                              final String name,
-                              final Set<String> sourceNodes,
-                              final Serde<K> keySerde,
-                              final Serde<V> valSerde,
-                              final boolean repartitionRequired) {
+    KGroupedStreamImpl(final KStreamBuilder topology,
+                       final String name,
+                       final Set<String> sourceNodes,
+                       final Serde<K> keySerde,
+                       final Serde<V> valSerde,
+                       final boolean repartitionRequired) {
         super(topology, name, sourceNodes);
         this.keySerde = keySerde;
         this.valSerde = valSerde;
@@ -188,6 +191,130 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                 storeSupplier);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                final Aggregator<? super K, ? super V, T> aggregator,
+                                                final Merger<? super K, T> sessionMerger,
+                                                final SessionWindows sessionWindows,
+                                                final Serde<T> aggValueSerde,
+                                                final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        return aggregate(initializer,
+                         aggregator,
+                         sessionMerger,
+                         sessionWindows,
+                         aggValueSerde,
+                         storeFactory(keySerde, aggValueSerde, storeName)
+                          .sessionWindowed(sessionWindows.maintainMs()).build());
+
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                final Aggregator<? super K, ? super V, T> aggregator,
+                                                final Merger<? super K, T> sessionMerger,
+                                                final SessionWindows sessionWindows,
+                                                final Serde<T> aggValueSerde,
+                                                final StateStoreSupplier<SessionStore> storeSupplier) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
+        Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+
+        return (KTable<Windowed<K>, T>) doAggregate(
+                new KStreamSessionWindowAggregate<>(sessionWindows, storeSupplier.name(), initializer, aggregator, sessionMerger),
+                AGGREGATE_NAME,
+                storeSupplier);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        return count(sessionWindows,
+                     storeFactory(keySerde, Serdes.Long(), storeName)
+                             .sessionWindowed(sessionWindows.maintainMs()).build());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
+                                           final StateStoreSupplier<SessionStore> storeSupplier) {
+        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        final Initializer<Long> initializer = new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        };
+        final Aggregator<K, V, Long> aggregator = new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(final K aggKey, final V value, final Long aggregate) {
+                return aggregate + 1;
+            }
+        };
+        final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
+            @Override
+            public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
+                return aggOne + aggTwo;
+            }
+        };
+
+        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final SessionWindows sessionWindows,
+                                         final String storeName) {
+
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        return reduce(reducer, sessionWindows,
+                      storeFactory(keySerde, valSerde, storeName)
+                              .sessionWindowed(sessionWindows.maintainMs()).build());
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                         final SessionWindows sessionWindows,
+                                         final StateStoreSupplier<SessionStore> storeSupplier) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+
+        final Initializer<V> initializer = new Initializer<V>() {
+            @Override
+            public V apply() {
+                return null;
+            }
+        };
+
+        final Aggregator<K, V, V> aggregator = new Aggregator<K, V, V>() {
+            @Override
+            public V apply(final K aggKey, final V value, final V aggregate) {
+                if (aggregate == null) {
+                    return value;
+                }
+                return reducer.apply(aggregate, value);
+            }
+        };
+
+        final Merger<K, V> sessionMerger = new Merger<K, V>() {
+            @Override
+            public V apply(final K aggKey, final V aggOne, final V aggTwo) {
+                return aggregator.apply(aggKey, aggTwo, aggOne);
+            }
+        };
+
+        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier);
+    }
 
     private <T> KTable<K, T> doAggregate(
             final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 5bbda1f..fdc14df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -59,7 +59,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
 
@@ -82,7 +82,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 9af4368..fa26a7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -56,7 +56,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             super.init(context);
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
 
@@ -79,7 +79,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             }
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
new file mode 100644
index 0000000..1af1f48
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+
+    private final String storeName;
+    private final SessionWindows windows;
+    private final Initializer<T> initializer;
+    private final Aggregator<? super K, ? super V, T> aggregator;
+    private final Merger<? super K, T> sessionMerger;
+
+    private boolean sendOldValues = false;
+
+    KStreamSessionWindowAggregate(final SessionWindows windows,
+                                  final String storeName,
+                                  final Initializer<T> initializer,
+                                  final Aggregator<? super K, ? super V, T> aggregator,
+                                  final Merger<? super K, T> sessionMerger) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+        this.sessionMerger = sessionMerger;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSessionWindowAggregateProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> {
+
+        private SessionStore<K, T> store;
+        private TupleForwarder<Windowed<K>, T> tupleForwarder;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            store = (SessionStore<K,  T>) context.getStateStore(storeName);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            // if the key is null, we do not need proceed aggregating
+            // the record with the table
+            if (key == null) {
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
+            final TimeWindow newTimeWindow = new TimeWindow(timestamp, timestamp);
+            TimeWindow mergedWindow = newTimeWindow;
+            T agg = initializer.apply();
+
+            try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessionsToMerge(key, timestamp - windows.inactivityGap(),
+                                                                                               timestamp + windows.inactivityGap())) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, T> next = iterator.next();
+                    merged.add(next);
+                    agg = sessionMerger.apply(key, agg, next.value);
+                    mergedWindow = mergeTimeWindow(mergedWindow, (TimeWindow) next.key.window());
+                }
+            }
+
+            agg = aggregator.apply(key, value, agg);
+            final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
+            if (!mergedWindow.equals(newTimeWindow)) {
+                for (final KeyValue<Windowed<K>, T> session : merged) {
+                    store.remove(session.key);
+                    tupleForwarder.maybeForward(session.key, null, session.value);
+                }
+            }
+            store.put(sessionKey, agg);
+            tupleForwarder.maybeForward(sessionKey, agg, null);
+        }
+
+    }
+
+
+    private TimeWindow mergeTimeWindow(final TimeWindow one, final TimeWindow two) {
+        final long start = one.start() < two.start() ? one.start() : two.start();
+        final long end = one.end() > two.end() ? one.end() : two.end();
+        return new TimeWindow(start, end);
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+            @Override
+            public KTableValueGetter<Windowed<K>, T> get() {
+                return new KTableSessionWindowValueGetter();
+            }
+
+            @Override
+            public String[] storeNames() {
+                return new String[] {storeName};
+            }
+        };
+    }
+
+    private class KTableSessionWindowValueGetter implements KTableValueGetter<Windowed<K>, T> {
+        private SessionStore<K, T> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            store = (SessionStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public T get(final Windowed<K> key) {
+            try (KeyValueIterator<Windowed<K>, T> iter = store.findSessionsToMerge(key.key(), key.window().end(), key.window().end())) {
+                if (!iter.hasNext()) {
+                    return null;
+                }
+                final T value = iter.next().value;
+                if (iter.hasNext()) {
+                    throw new ProcessorStateException(String.format("Iterator for key [%s] on session store has more than one value", key));
+                }
+                return value;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index d74a399..25fdda9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -68,7 +68,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             super.init(context);
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
 
         @Override
@@ -110,7 +110,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
-                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
+                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -121,7 +121,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
                 T oldAgg = initializer.apply();
                 T newAgg = aggregator.apply(key, value, oldAgg);
                 windowStore.put(key, newAgg, windowStartMs);
-                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), newAgg, oldAgg, sendOldValues);
+                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), newAgg, oldAgg);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 5ee02e9..3eca401 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -64,7 +64,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
         public void init(ProcessorContext context) {
             super.init(context);
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
 
         @Override
@@ -108,7 +108,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
                         // update the store with the new value
                         windowStore.put(key, newAgg, window.start());
-                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg, sendOldValues);
+                        tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg);
                         matchedWindows.remove(entry.key);
                     }
                 }
@@ -117,7 +117,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
                 windowStore.put(key, value, windowStartMs);
-                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null, false);
+                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)), value, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index fd04fb3..a414200 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -61,7 +61,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
         /**
@@ -92,7 +92,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 7b29d1c..30b314a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -58,7 +58,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
         /**
@@ -89,7 +89,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg, sendOldValues);
+            tupleForwarder.maybeForward(key, newAgg, oldAgg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 7b777d1..d391151 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -53,7 +53,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         public void init(ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues));
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
 
         @Override
@@ -63,7 +63,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
                 throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
             V oldValue = store.get(key);
             store.put(key, value);
-            tupleForwarder.maybeForward(key, value, oldValue, sendOldValues);
+            tupleForwarder.maybeForward(key, value, oldValue);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
new file mode 100644
index 0000000..165d5c6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Serde for a {@link Windowed} key when working with {@link org.apache.kafka.streams.kstream.SessionWindows}
+ *
+ * @param <K> sessionId type
+ */
+public class SessionKeySerde<K> implements Serde<Windowed<K>> {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final String SESSIONKEY = "sessionkey";
+
+    private final Serde<K> keySerde;
+
+    public SessionKeySerde(final Serde<K> keySerde) {
+        this.keySerde = keySerde;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Serializer<Windowed<K>> serializer() {
+        return new SessionKeySerializer(keySerde.serializer());
+    }
+
+    @Override
+    public Deserializer<Windowed<K>> deserializer() {
+        return new SessionKeyDeserializer(keySerde.deserializer());
+    }
+
+    private class SessionKeySerializer implements Serializer<Windowed<K>> {
+
+        private final Serializer<K> keySerializer;
+
+        SessionKeySerializer(final Serializer<K> keySerializer) {
+            this.keySerializer = keySerializer;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Windowed<K> data) {
+            if (data == null) {
+                return null;
+            }
+            return toBinary(data, keySerializer).get();
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    private class SessionKeyDeserializer implements Deserializer<Windowed<K>> {
+        private final Deserializer<K> deserializer;
+
+        SessionKeyDeserializer(final Deserializer<K> deserializer) {
+            this.deserializer = deserializer;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+        }
+
+        @Override
+        public Windowed<K> deserialize(final String topic, final byte[] data) {
+            if (data == null || data.length == 0) {
+                return null;
+            }
+            return from(data, deserializer);
+        }
+
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+    public static long extractEnd(final byte [] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+    }
+
+    public static long extractStart(final byte [] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
+    }
+
+    public static byte[] extractKeyBytes(final byte[] binaryKey) {
+        final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+        return bytes;
+    }
+
+    public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) {
+        final K key = extractKey(binaryKey, keyDeserializer);
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+        return new Windowed<>(key, new TimeWindow(start, end));
+    }
+
+    private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
+        return deserializer.deserialize(SESSIONKEY, extractKeyBytes(binaryKey));
+    }
+
+    public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer) {
+        final byte[] bytes = serializer.serialize(SESSIONKEY, sessionKey.key());
+        ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
+        buf.put(bytes);
+        buf.putLong(sessionKey.window().end());
+        buf.putLong(sessionKey.window().start());
+        return new Bytes(buf.array());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index 02609d7..4834acf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -21,16 +21,26 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
 
-
+/**
+ * This class is used to determine if a processor should forward values to child nodes.
+ * Forwarding only occurs when caching is not enabled.
+ *
+ * @param <K>
+ * @param <V>
+ */
 class TupleForwarder<K, V> {
     private final boolean cached;
     private final ProcessorContext context;
+    private final boolean sendOldValues;
+
     @SuppressWarnings("unchecked")
-    public TupleForwarder(final StateStore store,
-                          final ProcessorContext context,
-                          final ForwardingCacheFlushListener flushListener) {
+    TupleForwarder(final StateStore store,
+                   final ProcessorContext context,
+                   final ForwardingCacheFlushListener flushListener,
+                   final boolean sendOldValues) {
         this.cached = store instanceof CachedStateStore;
         this.context = context;
+        this.sendOldValues = sendOldValues;
         if (this.cached) {
             ((CachedStateStore) store).setFlushListener(flushListener);
         }
@@ -38,8 +48,7 @@ class TupleForwarder<K, V> {
 
     public void maybeForward(final K key,
                              final V newValue,
-                             final V oldValue,
-                             final boolean sendOldValues) {
+                             final V oldValue) {
         if (!cached) {
             if (sendOldValues) {
                 context.forward(key, new Change<>(newValue, oldValue));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 74fea9c..ca6b7b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -912,11 +912,11 @@ public class TopologyBuilder {
     }
 
     private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier supplier, final String name) {
-        if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
+        if (!(supplier instanceof WindowStoreSupplier)) {
             return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
         }
 
-        final RocksDBWindowStoreSupplier windowStoreSupplier = (RocksDBWindowStoreSupplier) supplier;
+        final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
         final InternalTopicConfig config = new InternalTopicConfig(name,
                                                                    Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
                                                                                InternalTopicConfig.CleanupPolicy.delete),

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index ddbc7b3..f9bab2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -37,4 +37,10 @@ public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closea
 
     @Override
     void close();
+
+    /**
+     * Peek at the next key without advancing the iterator
+     * @return the key of the next value that would be returned from the next call to next
+     */
+    K peekNextKey();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index d57fe35..a783e07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -16,6 +16,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
 import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 
@@ -46,6 +47,16 @@ public class QueryableStoreTypes {
         return new WindowStoreType<>();
     }
 
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
+     * @param <K>   key type of the store
+     * @param <V>   value type of the store
+     * @return  {@link SessionStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() {
+        return new SessionStoreType<>();
+    }
+
     private static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
 
         private final Class matchTo;
@@ -85,6 +96,15 @@ public class QueryableStoreTypes {
                                                 final String storeName) {
             return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
         }
+    }
 
+    private static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+        SessionStoreType() {
+            super(ReadOnlySessionStore.class);
+        }
+        @Override
+        public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider, final String storeName) {
+            return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
new file mode 100644
index 0000000..f09d357
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.kstream.Windowed;
+
+/**
+ * A session store that only supports read operations.
+ * Implementations should be thread-safe as concurrent reads and writes
+ * are expected.
+ *
+ * @param <K> the key type
+ * @param <AGG> the aggregated value type
+ */
+@InterfaceStability.Unstable
+public interface ReadOnlySessionStore<K, AGG> {
+
+    /**
+     * Retrieve all aggregated sessions for the provided key
+     * @param    key record key to find aggregated session values for
+     * @return   KeyValueIterator containing all sessions for the provided key.
+     */
+    KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
new file mode 100644
index 0000000..39658a3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * Interface for storing the aggregated values of sessions
+ * @param <K>   type of the record keys
+ * @param <AGG> type of the aggregated values
+ */
+public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {
+
+    /**
+     * Fetch any sessions with the matching key and the sessions end is &le earliestEndTime and the sessions
+     * start is &ge latestStartTime
+     */
+    KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
+
+    /**
+     * Remove the session aggregated with provided {@link Windowed} key from the store
+     * @param sessionKey key of the session to remove
+     */
+    void remove(final Windowed<K> sessionKey);
+
+    /**
+     * Write the aggregated value for the provided key to the store
+     * @param sessionKey key of the session to write
+     * @param aggregate  the aggregated value for the session
+     */
+    void put(final Windowed<K> sessionKey, final AGG aggregate);
+}


Mime
View raw message