kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: MINOR: Update JavaDoc of KStream interface
Date Thu, 08 Dec 2016 19:08:03 GMT
MINOR: Update JavaDoc of KStream interface

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2153 from mjsax/javaDocKStreams


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1949a76b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1949a76b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1949a76b

Branch: refs/heads/trunk
Commit: 1949a76bc4189534b853e21c476bb11172fa3fc9
Parents: 600859e
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Dec 8 11:07:59 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 8 11:07:59 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |  634 ++++--
 .../apache/kafka/streams/kstream/KStream.java   | 2020 +++++++++++++-----
 2 files changed, 2028 insertions(+), 626 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1949a76b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f47c904..33a2791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -4,231 +4,569 @@
  * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.  You may obtain a
  * copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.WindowStore;
 
 /**
- * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
- * usually grouped on a different key than the original stream key
- *
+ * {@link KGroupedStream} is an abstraction of a <i>grouped</i> record stream of key-value pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply an aggregation operation on the original
+ * {@link KStream} records.
  * <p>
- * It is an intermediate representation of a {@link KStream} before an
- * aggregation is applied to the new partitions resulting in a new {@link KTable}.
+ * A {@link KGroupedStream} must be obtained from a {@link KStream} via {@link KStream#groupByKey() #groupByKey()} or
+ * {@link KStream#groupBy(KeyValueMapper) #groupBy(...)}.
+ *
  * @param <K> Type of keys
  * @param <V> Type of values
- *
  * @see KStream
  */
 @InterfaceStability.Unstable
 public interface KGroupedStream<K, V> {
 
-
     /**
-     * Combine values of this stream by the grouped key into a new instance of ever-updating
-     * {@link KTable}. The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Count the number of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param reducer           the instance of {@link Reducer}
-     * @param storeName         the name of the underlying {@link KTable} state store
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+     * @param storeName the name of the underlying {@link KTable} state store
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, V> reduce(Reducer<V> reducer,
-                        final String storeName);
+    KTable<K, Long> count(final String storeName);
 
     /**
-     * Combine values of this stream by the grouped key into a new instance of ever-updating
-     * {@link KTable}. The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Count the number of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param reducer       the instance of {@link Reducer}
      * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, V> reduce(final Reducer<V> reducer,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} value are ignored.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param reducer           the instance of {@link Reducer}
-     * @param windows           the specification of the aggregation {@link Windows}
-     * @param storeName         the name of the state store created from this operation
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @param windows   the specification of the aggregation {@link Windows}
+     * @param storeName the name of the underlying {@link KTable} state store
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) count (i.e., number of records) for each key within a window
      */
-    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
-                                                     Windows<W> windows,
-                                                     final String storeName);
+    <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+                                                       final String storeName);
 
     /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} value are ignored.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param reducer       the instance of {@link Reducer}
      * @param windows       the specification of the aggregation {@link Windows}
      * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) count (i.e., number of records) for each key within a window
      */
-    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
-                                                     Windows<W> windows,
-                                                     final StateStoreSupplier<WindowStore> storeSupplier);
-
+    <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
+                                                       final StateStoreSupplier<WindowStore> storeSupplier);
 
     /**
-     * Aggregate values of this stream by key into a new instance of a {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Serde, String)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param storeName     the name of the state store created from this operation
-     * @param <T>           the value type of the resulting {@link KTable}
-     *
-     * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param storeName the name of the underlying {@link KTable} state store
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) aggregate for each key
      */
-    <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<K, V, T> aggregator,
-                               Serde<T> aggValueSerde,
-                               final String storeName);
+    KTable<K, V> reduce(final Reducer<V> reducer,
+                        final String storeName);
 
     /**
-     * Aggregate values of this stream by key into a new instance of a {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Combine the value of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)}).
+     * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
      * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @param <T>           the value type of the resulting {@link KTable}
-     * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) aggregate for each key
      */
-    <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<K, V, T> aggregator,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, V> reduce(final Reducer<V> reducer,
+                        final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Combine the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-key";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param <T>           the value type of the resulting {@link KTable}
-     * @param storeName     the name of the state store created from this operation
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param windows   the specification of the aggregation {@link Windows}
+     * @param storeName the name of the state store created from this operation
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) aggregate for each key within a window
      */
-    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
-                                                           Aggregator<K, V, T> aggregator,
-                                                           Windows<W> windows,
-                                                           Serde<T> aggValueSerde,
-                                                           final String storeName);
+    <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                     final Windows<W> windows,
+                                                     final String storeName);
 
     /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Combine the values of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)}).
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied an the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@link #reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * Sting storeName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-key";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> sumForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
+     * @param reducer       a {@link Reducer} that computes a new aggregate result
      * @param windows       the specification of the aggregation {@link Windows}
-     * @param <T>           the value type of the resulting {@link KTable}
      * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) aggregate for each key within a window
      */
-    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
-                                                           Aggregator<K, V, T> aggregator,
-                                                           Windows<W> windows,
-                                                           final StateStoreSupplier<WindowStore> storeSupplier);
+    <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                                     final Windows<W> windows,
+                                                     final StateStoreSupplier<WindowStore> storeSupplier);
+
 
     /**
-     * Count number of records of this stream by key into a new instance of a {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it allows the
+     * result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count(String)}) TODO add more examples.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeName  the name of the underlying {@link KTable} state store
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param storeName     the name of the state store created from this operation
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) aggregate for each key
      */
-    KTable<K, Long> count(final String storeName);
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<K, V, VR> aggregator,
+                                 final Serde<VR> aggValueSerde,
+                                 final String storeName);
 
     /**
-     * Count number of records of this stream by key into a new instance of a {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, StateStoreSupplier)} combining via reduce(...)} as it
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, StateStoreSupplier)} can be used to compute aggregate functions
+     * like count (c.f. {@link #count(String)}) TODO add more examples.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some aggregation on value type double TODO update example
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param storeSupplier  user defined state store supplier {@link StateStoreSupplier}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
+     * (rolling) aggregate for each key
      */
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<K, V, VR> aggregator,
+                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Aggregate the values of records in this stream by the grouped key and defined windows.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Windows, String)} combining via reduce(...)} as it
+     * allows the result to have a different type than the input values.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
+     * materialized view) that can be queried using the provided {@code storeName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the cache size.
+     * You can configure the cache size via {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * The specified {@link Initializer} is applied once per window directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-key";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param windows   the specification of the aggregation {@link Windows}
-     * @param storeName the name of the state store created from this operation
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @param storeName     the name of the state store created from this operation
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) aggregate for each key within a window
      */
-    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, final String storeName);
+    <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                            final Aggregator<K, V, VR> aggregator,
+                                                            final Windows<W> windows,
+                                                            final Serde<VR> aggValueSerde,
+                                                            final String storeName);
 
     /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Aggregate the values of records in this stream by the grouped key and defined windows.
+     * Records with {@code null} value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Windows, StateStoreSupplier)} combining via
+     * reduce(...)} as it allows the result to have a different type than the input values.
+     * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} provided by the given {@code storeSupplier}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
+     * <p>
+     * The specified {@link Initializer} is applied once per window directly before the first input record is
+     * processed to provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@link #aggregate(Initializer, Aggregator, Windows, Serde, String)} can be used to compute aggregate
+     * functions like count (c.f. {@link #count(String)}) TODO add more examples.
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * Use {@link StateStoreSupplier#name()} to get the store name:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some windowed aggregation on value type double TODO update example
+     * Sting storeName = storeSupplier.name();
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+     * String key = "some-key";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> aggForKeyForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
      * @param windows       the specification of the aggregation {@link Windows}
+     * @param <VR>          the value type of the resulting {@link KTable}
      * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
+     * the latest (rolling) aggregate for each key within a window
      */
-    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows,
-                                                       final StateStoreSupplier<WindowStore> storeSupplier);
+    <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
+                                                             final Aggregator<K, V, VR> aggregator,
+                                                             final Windows<W> windows,
+                                                             final StateStoreSupplier<WindowStore> storeSupplier);
 
 }


Mime
View raw message