kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for queryable stores
Date Wed, 03 May 2017 23:16:02 GMT
KAFKA-5045: Clarify on KTable APIs for queryable stores

This is the implementation of KIP-114: KTable state stores and improved semantics:
- Allow for decoupling between querying and materialisation
- consistent APIs, overloads with queryableName and without
- depreciated several KTable calls
- new unit and integration tests

In this implementation, state stores are materialized if the user desires them to be queryable. In subsequent versions we can offer a second option, to have a view-like state store. The tradeoff then would be between storage space (materialize) and re-computation (view). That tradeoff can be exploited by later query optimizers.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2832 from enothereska/KAFKA-5045-ktable


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec9e4eaf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec9e4eaf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec9e4eaf

Branch: refs/heads/trunk
Commit: ec9e4eafa406fec897713310bafdedf6bbb3c0c5
Parents: a3952ae
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Wed May 3 16:15:54 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 3 16:15:54 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |  552 +++++++--
 .../kafka/streams/kstream/KGroupedTable.java    |  323 +++++-
 .../kafka/streams/kstream/KStreamBuilder.java   |  479 +++++++-
 .../apache/kafka/streams/kstream/KTable.java    | 1094 +++++++++++++++++-
 .../kstream/internals/AbstractStream.java       |    7 +
 .../kstream/internals/KGroupedStreamImpl.java   |  128 +-
 .../kstream/internals/KGroupedTableImpl.java    |  113 +-
 .../streams/kstream/internals/KStreamImpl.java  |    2 +-
 .../streams/kstream/internals/KTableFilter.java |   26 +-
 .../streams/kstream/internals/KTableImpl.java   |  326 +++++-
 .../internals/KTableKTableJoinMerger.java       |   35 +-
 .../kstream/internals/KTableMapValues.java      |   27 +-
 .../streams/processor/TopologyBuilder.java      |   20 +-
 .../KStreamAggregationIntegrationTest.java      |   28 +-
 .../KTableKTableJoinIntegrationTest.java        |  155 ++-
 .../QueryableStateIntegrationTest.java          |  178 +++
 .../streams/kstream/KStreamBuilderTest.java     |   55 +-
 .../internals/KGroupedStreamImplTest.java       |  279 +++--
 .../internals/KGroupedTableImplTest.java        |   72 +-
 .../kstream/internals/KTableAggregateTest.java  |   55 +-
 .../kstream/internals/KTableFilterTest.java     |  245 +++-
 .../kstream/internals/KTableImplTest.java       |   32 +-
 .../kstream/internals/KTableKTableJoinTest.java |  162 +--
 .../kstream/internals/KTableMapValuesTest.java  |  128 +-
 .../streams/processor/TopologyBuilderTest.java  |    2 +-
 .../internals/ProcessorTopologyTest.java        |    4 +-
 26 files changed, 3793 insertions(+), 734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index c961c7e..2cdf047 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -49,7 +49,7 @@ public interface KGroupedStream<K, V> {
      * Count the number of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -63,7 +63,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -73,18 +73,44 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, Long> count(final String storeName);
+    KTable<K, Long> count(final String queryableStoreName);
+
+    /**
+     * Count the number of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view).
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count();
 
     /**
      * Count the number of records in this stream by the grouped key.
@@ -105,15 +131,15 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
@@ -125,7 +151,7 @@ public interface KGroupedStream<K, V> {
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
      * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
      * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -141,7 +167,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
@@ -153,20 +179,52 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param windows   the specification of the aggregation {@link Windows}
-     * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#count(Windows)}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
      */
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
-                                                       final String storeName);
+                                                       final String queryableStoreName);
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param windows   the specification of the aggregation {@link Windows}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows);
 
     /**
      * Count the number of records in this stream by the grouped key and the defined windows.
@@ -191,8 +249,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-word";
      * long fromTime = ...;
      * long toTime = ...;
@@ -202,7 +260,7 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param windows       the specification of the aggregation {@link Windows}
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
      */
@@ -214,7 +272,7 @@ public interface KGroupedStream<K, V> {
      * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
      * Records with {@code null} key or value are ignored.
      * The result is written into a local {@link SessionStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -226,27 +284,48 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link SessionStore} it must be obtained via
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
-     * String key = "some-word";
-     * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param queryableStoreName  the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-. If {@code null} then this will be equivalent to {@link KGroupedStream#count(SessionWindows)} ()}.
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
+     * that represent the latest (rolling) count (i.e., number of records) for each key within a window
+     */
+    KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName);
+
+    /**
+     * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      *
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
-     * @param storeName      the name of the state store created from this operation; valid characters are ASCII
-     *                       alphanumerics, '.', '_' and '-
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
      */
-    KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);
+    KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows);
 
     /**
      * Count the number of records in this stream by the grouped key into {@link SessionWindows}.
@@ -264,22 +343,21 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link SessionStore} it must be obtained via
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
-     * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
-     * String key = "some-word";
-     * KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * KafkaStreams streams = ... // compute sum
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
+     * String key = "some-key";
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     *
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
-     * @param storeSupplier  user defined state store supplier
+     * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within a window
      */
@@ -292,7 +370,42 @@ public interface KGroupedStream<K, V> {
      * Combining implies that the type of the aggregate result is the same as the type of the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
@@ -312,7 +425,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -322,20 +435,21 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
     KTable<K, V> reduce(final Reducer<V> reducer,
-                        final String storeName);
+                        final String queryableStoreName);
+
 
     /**
      * Combine the value of records in this stream by the grouped key.
@@ -365,8 +479,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * String storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -374,7 +488,7 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
@@ -389,7 +503,7 @@ public interface KGroupedStream<K, V> {
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
      * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
      * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -411,7 +525,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-key";
      * long fromTime = ...;
      * long toTime = ...;
@@ -423,22 +537,64 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param reducer   a {@link Reducer} that computes a new aggregate result
      * @param windows   the specification of the aggregation {@link Windows}
-     * @param storeName the name of the state store created from this operation; valid characters are ASCII
-     *                  alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, Windows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                      final Windows<W> windows,
-                                                     final String storeName);
+                                                     final String queryableStoreName);
+
+    /**
+     * Combine the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param windows   the specification of the aggregation {@link Windows}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                     final Windows<W> windows);
 
     /**
      * Combine the values of records in this stream by the grouped key and the defined windows.
@@ -472,8 +628,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-key";
      * long fromTime = ...;
      * long toTime = ...;
@@ -484,7 +640,7 @@ public interface KGroupedStream<K, V> {
      *
      * @param reducer       a {@link Reducer} that computes a new aggregate result
      * @param windows       the specification of the aggregation {@link Windows}
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -498,7 +654,7 @@ public interface KGroupedStream<K, V> {
      * Combining implies that the type of the aggregate result is the same as the type of the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
      * The result is written into a local {@link SessionStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -517,13 +673,15 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link SessionStore} it must be obtained via
-     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
-     * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -531,21 +689,53 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      * @param reducer           the instance of {@link Reducer}
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
-     * @param storeName         the name of the state store created from this operation; valid characters are ASCII
-     *                          alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName     the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final SessionWindows sessionWindows,
-                                  final String storeName);
+                                  final String queryableStoreName);
+
+    /**
+     * Combine values of this stream by the grouped key into {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)}).
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate functions like sum, min,
+     * or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * @param reducer           the instance of {@link Reducer}
+     * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final SessionWindows sessionWindows);
 
     /**
      * Combine values of this stream by the grouped key into {@link SessionWindows}.
@@ -572,15 +762,15 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
-     * To query the local {@link SessionStore} it must be obtained via
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlySessionStore<String,Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>ReadOnlySessionStore<String, Long>);
      * String key = "some-key";
-     * KeyValueIterator<Windowed<String>, Long> sumForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * KeyValueIterator<Windowed<String>, Long> sumForKeyForWindows = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
@@ -588,14 +778,14 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      * @param reducer           the instance of {@link Reducer}
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
-     * @param storeSupplier     user defined state store supplier
+     * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -610,7 +800,7 @@ public interface KGroupedStream<K, V> {
      * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
      * allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * The specified {@link Initializer} is applied once directly before the first input record is processed to
@@ -632,7 +822,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -642,18 +832,18 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
      * @param aggregator    an {@link Aggregator} that computes a new aggregate result
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param storeName     the name of the state store created from this operation; valid characters are ASCII
-     *                      alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Serde)} ()} ()}.
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
@@ -661,7 +851,52 @@ public interface KGroupedStream<K, V> {
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde,
-                                 final String storeName);
+                                 final String queryableStoreName);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count(String)}).
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> aggregator,
+                                 final Serde<VR> aggValueSerde);
+
 
     /**
      * Aggregate the values of records in this stream by the grouped key.
@@ -692,8 +927,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -702,7 +937,7 @@ public interface KGroupedStream<K, V> {
      *
      * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
      * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
@@ -719,7 +954,7 @@ public interface KGroupedStream<K, V> {
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
      * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
      * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -743,7 +978,7 @@ public interface KGroupedStream<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-key";
      * long fromTime = ...;
      * long toTime = ...;
@@ -755,10 +990,10 @@ public interface KGroupedStream<K, V> {
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
      * alphanumerics, '.', '_' and '-'.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      *
@@ -768,8 +1003,8 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param <VR>          the value type of the resulting {@link KTable}
-     * @param storeName     the name of the state store created from this operation; valid characters are ASCII
-     *                      alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -777,7 +1012,58 @@ public interface KGroupedStream<K, V> {
                                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
                                                              final Serde<VR> aggValueSerde,
-                                                             final String storeName);
+                                                             final String queryableStoreName);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined windows.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Windows, String) combining via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Initializer} is applied once per window directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)}).
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                             final Aggregator<? super K, ? super V, VR> aggregator,
+                                                             final Windows<W> windows,
+                                                             final Serde<VR> aggValueSerde);
+
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined windows.
@@ -812,8 +1098,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type Long
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
      * String key = "some-key";
      * long fromTime = ...;
      * long toTime = ...;
@@ -826,7 +1112,7 @@ public interface KGroupedStream<K, V> {
      * @param aggregator    an {@link Aggregator} that computes a new aggregate result
      * @param windows       the specification of the aggregation {@link Windows}
      * @param <VR>          the value type of the resulting {@link KTable}
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -841,7 +1127,7 @@ public interface KGroupedStream<K, V> {
      * Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String) combining via
      * reduce(...)} as it, for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link SessionStore} (which is basically an ever-updating
-     * materialized view) that can be queried using the provided {@code storeName}.
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
      * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
      * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
      * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
@@ -866,8 +1152,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -881,8 +1167,8 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
      * @param <T>           the value type of the resulting {@link KTable}
-     * @param storeName     the name of the state store created from this operation; valid characters are ASCII
-     *                      alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde)} ()} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -891,7 +1177,49 @@ public interface KGroupedStream<K, V> {
                                          final Merger<? super K, T> sessionMerger,
                                          final SessionWindows sessionWindows,
                                          final Serde<T> aggValueSerde,
-                                         final String storeName);
+                                         final String queryableStoreName);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, SessionWindows, String) combining via
+     * reduce(...)} as it, for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link SessionStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableStoreName}.
+     * SessionWindows are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Initializer} is applied once per session directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Merger, SessionWindows, Serde, String)} can be used to compute
+     * aggregate functions like count (c.f. {@link #count(String)})
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * @param initializer    the instance of {@link Initializer}
+     * @param aggregator     the instance of {@link Aggregator}
+     * @param sessionMerger  the instance of {@link Merger}
+     * @param sessionWindows the specification of the aggregation {@link SessionWindows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulting {@link KTable}
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
+     * the latest (rolling) aggregate for each key within a window
+     */
+    <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                         final Aggregator<? super K, ? super V, T> aggregator,
+                                         final Merger<? super K, T> sessionMerger,
+                                         final SessionWindows sessionWindows,
+                                         final Serde<T> aggValueSerde);
 
     /**
      * Aggregate the values of records in this stream by the grouped key and defined {@link SessionWindows}.
@@ -924,8 +1252,8 @@ public interface KGroupedStream<K, V> {
      * Use {@link StateStoreSupplier#name()} to get the store name:
      * <pre>{@code
      * KafkaStreams streams = ... // some windowed aggregation on value type double
-     * Sting storeName = storeSupplier.name();
-     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(storeName, QueryableStoreTypes.<String, Long>sessionStore());
+     * Sting queryableStoreName = storeSupplier.name();
+     * ReadOnlySessionStore<String, Long> sessionStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>sessionStore());
      * String key = "some-key";
      * KeyValueIterator<Windowed<String>, Long> aggForKeyForSession = localWindowStore.fetch(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -939,7 +1267,7 @@ public interface KGroupedStream<K, V> {
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
      * @param aggValueSerde  aggregate value serdes for materializing the aggregated table,
      *                       if not specified the default serdes defined in the configs will be used
-     * @param storeSupplier  user defined state store supplier
+     * @param storeSupplier  user defined state store supplier. Cannot be {@code null}.
      * @param <T>           the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window


Mime
View raw message