kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5655; materialized count, aggregate, reduce to KGroupedTable
Date Tue, 12 Sep 2017 16:20:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 08063f50a -> 8bd2a68b5


KAFKA-5655; materialized count, aggregate, reduce to KGroupedTable

Add overloads of `count`, `aggregate`, `reduce` using `Materialized` to `KGroupedTable`
deprecate other overloads

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3829 from dguy/kafka-5655


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8bd2a68b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8bd2a68b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8bd2a68b

Branch: refs/heads/trunk
Commit: 8bd2a68b5020f0bf8f79cbe59676d649eebf170f
Parents: 08063f5
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Sep 12 17:20:43 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Sep 12 17:20:43 2017 +0100

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedTable.java    | 204 +++++++++++++++++++
 .../kafka/streams/kstream/Materialized.java     |  12 +-
 .../kstream/internals/KGroupedTableImpl.java    | 134 +++++++++---
 .../kafka/streams/kstream/MaterializedTest.java |  54 +++++
 .../internals/KGroupedTableImplTest.java        | 137 ++++++++++++-
 .../kstream/internals/KTableAggregateTest.java  |   1 +
 6 files changed, 509 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index bf0df55..f854320 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -80,7 +81,9 @@ public interface KGroupedTable<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#count()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)}
      */
+    @Deprecated
     KTable<K, Long> count(final String queryableStoreName);
 
     /**
@@ -98,6 +101,47 @@ public interface KGroupedTable<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
+     * '.', '_' and '-'.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param materialized the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper) mapped} to
+     * the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
@@ -148,7 +192,9 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
+     * @deprecated use {@link #count(Materialized)}
      */
+    @Deprecated
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
@@ -218,7 +264,9 @@ public interface KGroupedTable<K, V> {
      * '.', '_' and '-'. If {@code null} this is the equivalent of {@link KGroupedTable#reduce(Reducer, Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
      */
+    @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
                         final Reducer<V> subtractor,
                         final String queryableStoreName);
@@ -228,6 +276,76 @@ public interface KGroupedTable<K, V> {
      * mapped} to the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+     * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the
+     * current aggregate (first argument) and the record's value (second argument) by adding the new record to the
+     * aggregate.
+     * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate (first argument) and the record's value (second
+     * argument) by "removing" the "replaced" record from the aggregate.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.
+     * For sum, the adder and substractor would work as follows:
+     * <pre>{@code
+     * public class SumAdder implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer newValue) {
+     *     return currentAgg + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubtractor implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer oldValue) {
+     *     return currentAgg - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics,
+     * '.', '_' and '-'.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param adder         a {@link Reducer} that adds a new value to the aggregate result
+     * @param subtractor    a {@link Reducer} that removed an old value from the aggregate result
+     * @param materialized  the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(final Reducer<V> adder,
+                        final Reducer<V> subtractor,
+                        final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+    /**
+     * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
      * that can be queried using the provided {@code queryableStoreName}.
@@ -344,7 +462,9 @@ public interface KGroupedTable<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #reduce(Reducer, Reducer, Materialized)}
      */
+    @Deprecated
     KTable<K, V> reduce(final Reducer<V> adder,
                         final Reducer<V> subtractor,
                         final StateStoreSupplier<KeyValueStore> storeSupplier);
@@ -427,7 +547,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>        the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,
@@ -439,6 +561,86 @@ public interface KGroupedTable<K, V> {
      * Records with {@code null} key are ignored.
      * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * Each update to the original {@link KTable} results in a two step update of the result {@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that provides an initial aggregate result value
+     * @param adder         an {@link Aggregator} that adds a new record to the aggregate result
+     * @param subtractor    an {@link Aggregator} that removed an old record from the aggregate result
+     * @param materialized  the instance of {@link Materialized} used to materialize the state store. Cannot be {@code null}
+     * @param <VR>          the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor,
+                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
      * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
      * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
      * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
@@ -582,7 +784,9 @@ public interface KGroupedTable<K, V> {
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> adder,
                                  final Aggregator<? super K, ? super V, VR> subtractor,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index fb2e7a6..1f142c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStore;
@@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Used to describe how a {@link StateStore} should be materialized.
@@ -70,13 +72,15 @@ public class Materialized<K, V, S extends StateStore> {
     /**
      * Materialize a {@link StateStore} with the given name.
      *
-     * @param storeName name of the store to materialize
+     * @param storeName  the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * alphanumerics, '.', '_' and '-'.
      * @param <K>       key type of the store
      * @param <V>       value type of the store
      * @param <S>       type of the {@link StateStore}
      * @return a new {@link Materialized} instance with the given storeName
      */
     public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
+        Topic.validate(storeName);
         return new Materialized<>(storeName);
     }
 
@@ -89,6 +93,7 @@ public class Materialized<K, V, S extends StateStore> {
      * @return a new {@link Materialized} instance with the given supplier
      */
     public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
+        Objects.requireNonNull(supplier, "supplier can't be null");
         return new Materialized<>(supplier);
     }
 
@@ -98,9 +103,11 @@ public class Materialized<K, V, S extends StateStore> {
      * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
      * @param <K>      key type of the store
      * @param <V>      value type of the store
-     * @return a new {@link Materialized} instance with the given supplier
+     * @return a new {@link Materialized} instance with the given sup
+     * plier
      */
     public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
+        Objects.requireNonNull(supplier, "supplier can't be null");
         return new Materialized<>(supplier);
     }
 
@@ -113,6 +120,7 @@ public class Materialized<K, V, S extends StateStore> {
      * @return a new {@link Materialized} instance with the given supplier
      */
     public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) {
+        Objects.requireNonNull(supplier, "supplier can't be null");
         return new Materialized<>(supplier);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index aefaad8..e69d4f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -48,6 +50,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     protected final Serde<? extends K> keySerde;
     protected final Serde<? extends V> valSerde;
     private boolean isQueryable = true;
+    private final Initializer<Long> countInitializer = new Initializer<Long>() {
+        @Override
+        public Long apply() {
+            return 0L;
+        }
+    };
+
+    private final Aggregator<K, V, Long> countAdder = new Aggregator<K, V, Long>() {
+        @Override
+        public Long apply(K aggKey, V value, Long aggregate) {
+            return aggregate + 1L;
+        }
+    };
+
+    private Aggregator<K, V, Long> countSubtractor = new Aggregator<K, V, Long>() {
+        @Override
+        public Long apply(K aggKey, V value, Long aggregate) {
+            return aggregate - 1L;
+        }
+    };
 
     public KGroupedTableImpl(final InternalStreamsBuilder builder,
                              final String name,
@@ -116,19 +138,33 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                          final String functionName,
                                          final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        String sinkName = builder.newName(KStreamImpl.SINK_NAME);
-        String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
-        String funcName = builder.newName(functionName);
+        final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
+        final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
+        final String funcName = builder.newName(functionName);
+
+        buildAggregate(aggregateSupplier,
+                       storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
+                       funcName,
+                       sourceName,
+                       sinkName);
+        builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
 
-        String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+    }
 
-        Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
-        Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+    private void buildAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
+                                final String topic,
+                                final String funcName,
+                                final String sourceName,
+                                final String sinkName) {
+        final Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
+        final Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+        final Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+        final Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
 
-        ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
-        ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
+        final ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+        final ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
 
         // send the aggregate key-value pairs to the intermediate topic for partitioning
         builder.internalTopologyBuilder.addInternalTopic(topic);
@@ -139,10 +175,23 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
         // aggregate the values with the aggregator and local store
         builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
-        builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
+    }
+
+    private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
+                                         final String functionName,
+                                         final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+        final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
+        final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
+        final String funcName = builder.newName(functionName);
+
+        buildAggregate(aggregateSupplier,
+                       materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
+                       funcName,
+                       sourceName, sinkName);
+        builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName);
 
         // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+        return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
     }
 
     @Override
@@ -155,6 +204,21 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
+                               final Reducer<V> subtractor,
+                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(adder, "adder can't be null");
+        Objects.requireNonNull(subtractor, "subtractor can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(),
+                                                                                     adder,
+                                                                                     subtractor);
+        return doAggregate(aggregateSupplier, REDUCE_NAME, materializedInternal);
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor) {
         return reduce(adder, subtractor, (String) null);
     }
@@ -177,6 +241,32 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     }
 
     @Override
+    public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+        return aggregate(countInitializer,
+                         countAdder,
+                         countSubtractor,
+                         materialized);
+    }
+
+    @Override
+    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                        final Aggregator<? super K, ? super V, VR> adder,
+                                        final Aggregator<? super K, ? super V, VR> subtractor,
+                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(adder, "adder can't be null");
+        Objects.requireNonNull(subtractor, "subtractor can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(materialized);
+        final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
+                                                                                        initializer,
+                                                                                        adder,
+                                                                                        subtractor);
+        return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal);
+    }
+
+    @Override
     public KTable<K, Long> count() {
         return count((String) null);
     }
@@ -184,23 +274,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     @Override
     public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
         return this.aggregate(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate - 1L;
-                    }
-                },
+                countInitializer,
+                countAdder,
+                countSubtractor,
                 storeSupplier);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
new file mode 100644
index 0000000..de3e503
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.junit.Test;
+
+public class MaterializedTest {
+
+    @Test
+    public void shouldAllowValidTopicNamesAsStoreName() {
+        Materialized.as("valid-name");
+        Materialized.as("valid.name");
+        Materialized.as("valid_name");
+    }
+
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAllowInvalidTopicNames() {
+        Materialized.as("not:valid");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfWindowBytesStoreSupplierIsNull() {
+        Materialized.as((WindowBytesStoreSupplier) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeyValueBytesStoreSupplierIsNull() {
+        Materialized.as((KeyValueBytesStoreSupplier) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfSessionBytesStoreSupplierIsNull() {
+        Materialized.as((SessionBytesStoreSupplier) null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 105dd2e..ff9726e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -18,12 +18,15 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -39,10 +42,13 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 
+@SuppressWarnings("deprecation")
 public class KGroupedTableImplTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -50,6 +56,7 @@ public class KGroupedTableImplTest {
     private KGroupedTable<String, String> groupedTable;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final String topic = "input";
 
     @Before
     public void before() {
@@ -142,7 +149,6 @@ public class KGroupedTableImplTest {
 
     @Test
     public void shouldReduce() {
-        final String topic = "input";
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
             new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
                 @Override
@@ -161,7 +167,6 @@ public class KGroupedTableImplTest {
 
     @Test
     public void shouldReduceWithInternalStoreName() {
-        final String topic = "input";
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
             new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
                 @Override
@@ -177,4 +182,132 @@ public class KGroupedTableImplTest {
         doShouldReduce(reduced, topic);
         assertNull(reduced.queryableStoreName());
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReduceAndMaterializeResults() {
+        final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
+            new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
+                @Override
+                public KeyValue<String, Integer> apply(String key, Number value) {
+                    return KeyValue.pair(key, value.intValue());
+                }
+            };
+
+        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+                .groupBy(intProjection)
+                .reduce(MockReducer.INTEGER_ADDER,
+                        MockReducer.INTEGER_SUBTRACTOR,
+                        Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("reduce")
+                                .withKeySerde(Serdes.String())
+                                .withValueSerde(Serdes.Integer()));
+
+        doShouldReduce(reduced, topic);
+        final KeyValueStore<String, Integer> reduce = (KeyValueStore<String, Integer>) driver.allStateStores().get("reduce");
+        assertThat(reduce.get("A"), equalTo(5));
+        assertThat(reduce.get("B"), equalTo(6));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCountAndMaterializeResults() {
+        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+                         Serialized.with(Serdes.String(),
+                                         Serdes.String()))
+                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
+                               .withKeySerde(Serdes.String())
+                               .withValueSerde(Serdes.Long()));
+
+        processData(topic);
+        final KeyValueStore<String, Long> counts = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+        assertThat(counts.get("1"), equalTo(3L));
+        assertThat(counts.get("2"), equalTo(2L));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAggregateAndMaterializeResults() {
+        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
+                      Serialized.with(Serdes.String(),
+                                      Serdes.String()))
+                .aggregate(MockInitializer.STRING_INIT,
+                           MockAggregator.TOSTRING_ADDER,
+                           MockAggregator.TOSTRING_REMOVER,
+                           Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
+                                   .withValueSerde(Serdes.String())
+                                   .withKeySerde(Serdes.String()));
+
+        processData(topic);
+        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+        assertThat(aggregate.get("1"), equalTo("0+1+1+1"));
+        assertThat(aggregate.get("2"), equalTo("0+2+2"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointOnCountWhenMaterializedIsNull() {
+        groupedTable.count((Materialized) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceWhenAdderIsNull() {
+        groupedTable.reduce(null, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceWhenSubtractorIsNull() {
+        groupedTable.reduce(MockReducer.STRING_ADDER, null, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateWhenInitializerIsNull() {
+        groupedTable.aggregate(null,
+                               MockAggregator.TOSTRING_ADDER,
+                               MockAggregator.TOSTRING_REMOVER,
+                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateWhenAdderIsNull() {
+        groupedTable.aggregate(MockInitializer.STRING_INIT,
+                               null,
+                               MockAggregator.TOSTRING_REMOVER,
+                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateWhenSubtractorIsNull() {
+        groupedTable.aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.TOSTRING_ADDER,
+                               null,
+                               Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
+        groupedTable.aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.TOSTRING_ADDER,
+                               MockAggregator.TOSTRING_REMOVER,
+                               (Materialized) null);
+    }
+
+    private void processData(final String topic) {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.Integer());
+        driver.setTime(0L);
+        driver.process(topic, "A", "1");
+        driver.process(topic, "B", "1");
+        driver.process(topic, "C", "1");
+        driver.process(topic, "D", "2");
+        driver.process(topic, "E", "2");
+        driver.flushState();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd2a68b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 9347cc8..accbb9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -405,4 +405,5 @@ public class KTableAggregateTest {
         driver.process("tableOne", "1", "5");
         assertEquals(Long.valueOf(4L), reduceResults.get("2"));
     }
+
 }


Mime
View raw message