kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/5] kafka git commit: KAFKA-5045: Clarify on KTable APIs for queryable stores
Date Wed, 03 May 2017 23:16:01 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 8685e8b..d14e600 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -46,7 +46,7 @@ public interface KGroupedTable<K, V> {
      * the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
@@ -60,7 +60,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -68,20 +68,48 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
      * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeName     the name of the underlying {@link KTable} state store; valid characters are ASCII
-     *                      alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName     the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, Long> count(final String storeName);
+    KTable<K, Long> count(final String queryableStoreName);
+
+    /**
+     * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
+     * the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count();
 
     /**
      * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
@@ -102,8 +130,8 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -111,13 +139,13 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
@@ -130,7 +158,7 @@ public interface KGroupedTable<K, V> {
      * Combining implies that the type of the aggregate result is the same as the type of the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
@@ -167,7 +195,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -175,24 +203,80 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
      * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param adder      a {@link Reducer} that adds a new value to the aggregate result
      * @param subtractor a {@link Reducer} that removed an old value from the aggregate result
-     * @param storeName     the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics,
-     *                      '.', '_' and '-'
+     * @param queryableStoreName     the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics,
+     * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
     KTable<K, V> reduce(final Reducer<V> adder,
                         final Reducer<V> subtractor,
-                        final String storeName);
+                        final String queryableStoreName);
+
+    /**
+     * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+     * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the
+     * current aggregate and the record's value by adding the new record to the aggregate.
+     * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+     * record from the aggregate.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.
+     * For sum, the adder and substractor would work as follows:
+     * <pre>{@code
+     * public class SumAdder implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer newValue) {
+     *     return currentAgg + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubtractor implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer oldValue) {
+     *     return currentAgg - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param adder      a {@link Reducer} that adds a new value to the aggregate result
+     * @param subtractor a {@link Reducer} that removed an old value from the aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(final Reducer<V> adder,
+                        final Reducer<V> subtractor);
 
     /**
      * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -238,8 +322,8 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -247,15 +331,15 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param adder         a {@link Reducer} that adds a new value to the aggregate result
      * @param subtractor    a {@link Reducer} that removed an old value from the aggregate result
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
@@ -319,7 +403,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -327,16 +411,17 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param initializer a {@link Initializer} that provides an initial aggregate result value
      * @param adder       a {@link Aggregator} that adds a new record to the aggregate result
      * @param subtractor  a {@link Aggregator} that removed an old record from the aggregate result
-     * @param storeName   the name of the underlying {@link KTable} state store
+     * @param queryableStoreName   the name of the underlying {@link KTable} state store.
+     *                             If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator)} ()}.
      * @param <VR>        the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
@@ -344,7 +429,7 @@ public interface KGroupedTable<K, V> {
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
-                                 final String storeName);
+                                 final String queryableStoreName);
 
     /**
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -352,8 +437,79 @@ public interface KGroupedTable<K, V> {
      * Records with {@code null} key are ignored.
      * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
+     * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
+     * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
+     * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
-     * that can be queried using the provided {@code storeName}.
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer a {@link Initializer} that provides an initial aggregate result value
+     * @param adder       a {@link Aggregator} that adds a new record to the aggregate result
+     * @param subtractor  a {@link Aggregator} that removed an old record from the aggregate result
+     * @param <VR>        the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor);
+
+
+    /**
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
      * <p>
      * The specified {@link Initializer} is applied once directly before the first input record is processed to
@@ -398,7 +554,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -406,10 +562,10 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
      * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
@@ -419,8 +575,8 @@ public interface KGroupedTable<K, V> {
      * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate result
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param storeName     the name of the underlying {@link KTable} state store; valid characters are ASCII
-     *                      alphanumerics, '.', '_' and '-'
+     * @param queryableStoreName     the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde)} ()}.
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
@@ -429,7 +585,78 @@ public interface KGroupedTable<K, V> {
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
                                  final Serde<VR> aggValueSerde,
-                                 final String storeName);
+                                 final String queryableStoreName);
+
+    /**
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   a {@link Initializer} that provides an initial aggregate result value
+     * @param adder         a {@link Aggregator} that adds a new record to the aggregate result
+     * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate result
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <VR>          the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor,
+                                 final Serde<VR> aggValueSerde);
+
 
     /**
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
@@ -483,8 +710,8 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * String storeName = storeSupplier.name();
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String queryableStoreName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -492,16 +719,16 @@ public interface KGroupedTable<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
-     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
-     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
-     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param initializer   a {@link Initializer} that provides an initial aggregate result value
      * @param adder         a {@link Aggregator} that adds a new record to the aggregate result
      * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate result
-     * @param storeSupplier user defined state store supplier
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec9e4eaf/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index c361cad..0e02c8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,6 +34,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
@@ -243,7 +243,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -251,7 +251,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -259,12 +259,103 @@ public class KStreamBuilder extends TopologyBuilder {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
      * @param topic     the topic name; cannot be {@code null}
-     * @param storeName the state store name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final String topic,
-                                     final String storeName) {
-        return table(null, null, null, topic, storeName);
+                                     final String queryableStoreName) {
+        return table(null, null, null, topic, queryableStoreName);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return table(null, null, null, topic, storeSupplier);
+    }
+
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * @param topic     the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final String topic) {
+        return table(null, null, null, topic, (String) null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param topic       the topic name; cannot be {@code null}
+     * @param queryableStoreName   the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final String topic,
+                                     final String queryableStoreName) {
+        return table(offsetReset, null, null, topic, queryableStoreName);
     }
 
     /**
@@ -276,7 +367,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -284,7 +375,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -294,13 +385,36 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
      *                    offsets are available
      * @param topic       the topic name; cannot be {@code null}
-     * @param storeName   the state store name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic,
-                                     final String storeName) {
-        return table(offsetReset, null, null, topic, storeName);
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return table(offsetReset, null, null, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param topic       the topic name; if {@code null} an internal store name will be automatically given.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final String topic) {
+        return table(offsetReset, null, null, topic, (String) null);
     }
 
     /**
@@ -312,7 +426,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -320,7 +434,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -332,25 +446,26 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param valSerde  value serde used to send key-value pairs,
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name; cannot be {@code null}
-     * @param storeName the state store name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final String storeName) {
-        return table(null, keySerde, valSerde, topic, storeName);
+                                     final String queryableStoreName) {
+        return table(null, keySerde, valSerde, topic, queryableStoreName);
     }
 
     /**
      * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -358,7 +473,94 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return table(null, keySerde, valSerde, topic, storeSupplier);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic) {
+        return table(null, keySerde, valSerde, topic, (String) null);
+    }
+
+    private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
+                                        final Serde<K> keySerde,
+                                        final Serde<V> valSerde,
+                                        final String topic,
+                                        final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                        final boolean isQueryable) {
+        final String source = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newName(KTableImpl.SOURCE_NAME);
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+
+        addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(),
+                valSerde == null ? null : valSerde.deserializer(),
+                topic);
+        addProcessor(name, processorSupplier, source);
+
+        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
+                keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+
+        addStateStore(storeSupplier, name);
+        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+        return kTable;
+    }
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
      * }</pre>
@@ -372,37 +574,142 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param valSerde    value serde used to send key-value pairs,
      *                    if not specified the default value serde defined in the configuration will be used
      * @param topic       the topic name; cannot be {@code null}
-     * @param storeName   the state store name; cannot be {@code null}
+     * @param queryableStoreName   the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
-                                     final String storeName) {
-        final String source = newName(KStreamImpl.SOURCE_NAME);
-        final String name = newName(KTableImpl.SOURCE_NAME);
-        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
-
-        addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
-        addProcessor(name, processorSupplier, source);
-
-        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);
+                                     final String queryableStoreName) {
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+        final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+                keySerde,
+                valSerde,
+                false,
+                Collections.<String, String>emptyMap(),
+                true);
+        return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, queryableStoreName != null);
+    }
 
-        // only materialize the KTable into a state store if the storeName is not null
-        if (storeName != null) {
-            final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
-                    keySerde,
-                    valSerde,
-                    false,
-                    Collections.<String, String>emptyMap(),
-                    true);
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic) {
+        return table(offsetReset, keySerde, valSerde, topic, (String) null);
+    }
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param topic       the topic name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, true);
+    }
 
-            addStateStore(storeSupplier, name);
-            connectSourceStoreAndTopic(storeName, topic);
-        }
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code queryableStoreName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic     the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(String)}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                 final String queryableStoreName) {
+        return globalTable(null, null, topic, queryableStoreName);
+    }
 
-        return kTable;
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic     the topic name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
+        return globalTable(null, null, topic, (String) null);
     }
 
     /**
@@ -411,7 +718,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -419,20 +726,33 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
      * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
      * regardless of the specified value in {@link StreamsConfig}.
      *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name; cannot be {@code null}
-     * @param storeName the state store name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
-                                                 final String storeName) {
-        return globalTable(null, null, topic, storeName);
+    @SuppressWarnings("unchecked")
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic,
+                                                 final String queryableStoreName) {
+        final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+        return doGlobalTable(keySerde, valSerde, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+                            keySerde,
+                            valSerde,
+                    false,
+                            Collections.<String, String>emptyMap(),
+                    true));
     }
 
     /**
@@ -441,7 +761,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
-     * {@code storeName}.
+     * {@code queryableStoreName}.
      * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -449,7 +769,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ...
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long valueForKey = localStore.get(key);
      * }</pre>
@@ -461,31 +781,60 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param valSerde  value serde used to send key-value pairs,
      *                  if not specified the default value serde defined in the configuration will be used
      * @param topic     the topic name; cannot be {@code null}
-     * @param storeName the state store name; cannot be {@code null}
+     * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link GlobalKTable} for the specified topic
      */
     @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
-                                                 final String storeName) {
+                                                 final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        return doGlobalTable(keySerde, valSerde, topic, storeSupplier);
+    }
+
+    private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
+                                                    final Serde<V> valSerde,
+                                                    final String topic,
+                                                    final StateStoreSupplier<KeyValueStore> storeSupplier) {
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         final String sourceName = newName(KStreamImpl.SOURCE_NAME);
         final String processorName = newName(KTableImpl.SOURCE_NAME);
-        final KTableSource<K, V> tableSource = new KTableSource<>(storeName);
+        final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
 
 
         final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
         final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
 
-        final StateStore store = new RocksDBKeyValueStoreSupplier<>(storeName,
-                                                                    keySerde,
-                                                                    valSerde,
-                                                                    false,
-                                                                    Collections.<String, String>emptyMap(),
-                                                                    true).get();
+        addGlobalStore(storeSupplier, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+    }
 
-        addGlobalStore(store, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
-        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeName));
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name; cannot be {@code null}
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+                                                 final Serde<V> valSerde,
+                                                 final String topic) {
+
+        return globalTable(keySerde, valSerde, topic, (String) null);
     }
 
     /**
@@ -512,4 +861,16 @@ public class KStreamBuilder extends TopologyBuilder {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 
+    /**
+     * <strong>This function is only for internal usage only and should not be called.</strong>
+     * <p>
+     * Create a unique state store name.
+     *
+     * @param prefix processor name prefix
+     * @return a new unique name
+     */
+    public String newStoreName(final String prefix) {
+        return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
+    }
+
 }


Mime
View raw message