kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5921; add Materialized overloads to windowed kstream
Date Tue, 19 Sep 2017 09:56:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 83bdcdbae -> c8f147199


KAFKA-5921; add Materialized overloads to windowed kstream

Add `Materialized` overloads to `WindowedKStream`. Deprecate existing methods on `KGroupedStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3889 from dguy/kafka-5921


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8f14719
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8f14719
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8f14719

Branch: refs/heads/trunk
Commit: c8f1471992c98e0104e3a7b2e093adc21b2d2a6f
Parents: 83bdcdb
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Sep 19 10:56:42 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Sep 19 10:56:42 2017 +0100

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   |   8 +
 .../kafka/streams/kstream/WindowedKStream.java  | 150 +++++++++++++++++--
 .../GroupedStreamAggregateBuilder.java          |  15 ++
 .../kstream/internals/KGroupedStreamImpl.java   |  32 ++--
 .../kstream/internals/WindowedKStreamImpl.java  |  95 ++++++++++--
 .../KStreamAggregationIntegrationTest.java      |   6 +-
 .../internals/WindowedKStreamImplTest.java      | 109 +++++++++++++-
 7 files changed, 361 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 08916ef..5621ab4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -667,7 +667,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link
KGroupedStream#reduce(Reducer, Windows)} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V>
reducer,
                                                      final Windows<W> windows,
                                                      final String queryableStoreName);
@@ -772,7 +774,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V>
reducer,
                                                      final Windows<W> windows,
                                                      final StateStoreSupplier<WindowStore>
storeSupplier);
@@ -1259,7 +1263,9 @@ public interface KGroupedStream<K, V> {
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link
KGroupedStream#aggregate(Initializer, Aggregator, Windows, Serde)} ()} ()}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
                                                              final Aggregator<? super
K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
@@ -1369,7 +1375,9 @@ public interface KGroupedStream<K, V> {
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
                                                              final Aggregator<? super
K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
index 4f73db7..35e0eeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.WindowStore;
 
 /**
  * {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream
of {@link KeyValue} pairs.
@@ -31,6 +33,14 @@ import org.apache.kafka.streams.state.KeyValueStore;
  * new (partitioned) windows resulting in a windowed {@link KTable}
  * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type
{@link Windowed Windowed<K>}.
  * <p>
+ * The specified {@code windows} define either hopping time windows that can be overlapping
or tumbling (c.f.
+ * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+ * The result is written into a local windowed {@link KeyValueStore} (which is basically
an ever-updating
+ * materialized view) that can be queried using the name provided in the {@link Materialized}
instance.
+ * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog
stream, where
+ * "windowed" implies that the {@link KTable} key is a combined key of the original record
key and a window ID.
+
  * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)}
.
  *
  * @param <K> Type of keys
@@ -43,13 +53,6 @@ public interface WindowedKStream<K, V> {
     /**
      * Count the number of records in this stream by the grouped key and the defined windows.
      * Records with {@code null} key or value are ignored.
-     * The specified {@code windows} define either hopping time windows that can be overlapping
or tumbling (c.f.
-     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
-     * The result is written into a local windowed {@link KeyValueStore} (which is basically
an ever-updating
-     * materialized view) that can be queried using the provided {@code queryableName}.
-     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
-     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable}
changelog stream, where
-     * "windowed" implies that the {@link KTable} key is a combined key of the original record
key and a window ID.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
      * the same window and key.
@@ -71,6 +74,38 @@ public interface WindowedKStream<K, V> {
     KTable<Windowed<K>, Long> count();
 
     /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate
consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enabled the rate of propagated updates depends on your input data
rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the
store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key,
timeFrom, timeTo); // key must be local (application state is shared over all running Kafka
Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes,
byte[]>> materialized);
+
+    /**
      * Aggregate the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
      * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)}
as it, for example,
@@ -84,9 +119,12 @@ public interface WindowedKStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Serde)} can be used to compute aggregate
functions like
+     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate
functions like
      * count (c.f. {@link #count()}).
      * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer,
Aggregator, Materialized)}.
+     * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
      * the same key.
      * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
@@ -102,17 +140,63 @@ public interface WindowedKStream<K, V> {
      * Note that the internal store name may not be queriable through Interactive Queries.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
+     * @param <VR>          the value type of the resulting {@link KTable}
      * @param initializer   an {@link Initializer} that computes an initial intermediate
aggregation result
      * @param aggregator    an {@link Aggregator} that computes a new aggregate result
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will
be used
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
+                                           final Aggregator<? super K, ? super V, VR>
aggregator);
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)}
as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input
record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first
record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result
provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute
aggregate functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate
consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate,
the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     *
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the
store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> aggregateStore = localWindowStore.fetch(key, timeFrom,
timeTo); // key must be local (application state is shared over all running Kafka Streams
instances)
+     * }</pre>
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate
aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param materialized  an instance of {@link Materialized} used to materialize a state
store. Cannot be {@code null}.
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
      * latest (rolling) aggregate for each key
      */
     <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
                                            final Aggregator<? super K, ? super V, VR>
aggregator,
-                                           final Serde<VR> aggValueSerde);
+                                           final Materialized<K, VR, WindowStore<Bytes,
byte[]>> materialized);
 
     /**
      * Combine the values of records in this stream by the grouped key.
@@ -147,4 +231,46 @@ public interface WindowedKStream<K, V> {
      * latest (rolling) aggregate for each key
      */
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of
the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the store name as provided with {@link Materialized}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new
aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like
sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache will be used to deduplicate
consecutive updates to
+     * the same window and key if caching is enabled on the {@link Materialized} instance.
+     * When caching is enable the rate of propagated updates depends on your input data rate,
the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}
+     * <p>
+     * To query the local windowed {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * Store queryableStoreName = ... // the queryableStoreName should be the name of the
store as defined by the Materialized instance
+     * ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>windowStore());
+     *
+     * String key = "some-word";
+     * long fromTime = ...;
+     * long toTime = ...;
+     * WindowStoreIterator<Long> reduceStore = localWindowStore.fetch(key, timeFrom,
timeTo); // key must be local (application state is shared over all running Kafka Streams
instances)
+     * }</pre>
+     *
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
+                                  final Materialized<K, V, WindowStore<Bytes, byte[]>>
materialized);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index ef0cdfc..6fb7a35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.state.StoreBuilder;
 
@@ -31,6 +33,19 @@ class GroupedStreamAggregateBuilder<K, V> {
     private final Set<String> sourceNodes;
     private final String name;
 
+    final Initializer<Long> countInitializer = new Initializer<Long>() {
+        @Override
+        public Long apply() {
+            return 0L;
+        }
+    };
+    final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>()
{
+        @Override
+        public Long apply(K aggKey, V value, Long aggregate) {
+            return aggregate + 1;
+        }
+    };
+
     GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
                                   final Serde<K> keySerde,
                                   final Serde<V> valueSerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 1fab2c5..ba037f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -49,18 +49,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
     private final boolean repartitionRequired;
-    private final Initializer<Long> countInitializer = new Initializer<Long>()
{
-        @Override
-        public Long apply() {
-            return 0L;
-        }
-    };
-    private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V,
Long>() {
-        @Override
-        public Long apply(K aggKey, V value, Long aggregate) {
-            return aggregate + 1;
-        }
-    };
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private boolean isQueryable = true;
 
@@ -235,7 +223,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K>
implements KGroupedStre
                                                                   final Aggregator<? super
K, ? super V, T> aggregator,
                                                                   final Windows<W>
windows,
                                                                   final Serde<T> aggValueSerde)
{
-        return windowedBy(windows).aggregate(initializer, aggregator, aggValueSerde);
+        return windowedBy(windows).aggregate(initializer, aggregator,
+                                             Materialized.<K, T, WindowStore<Bytes,
byte[]>>as(builder.newStoreName(AGGREGATE_NAME))
+                                                     .withKeySerde(keySerde)
+                                                     .withValueSerde(aggValueSerde));
     }
 
     @SuppressWarnings("unchecked")
@@ -268,12 +259,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K>
implements KGroupedStre
 
     @Override
     public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier)
{
-        return aggregate(countInitializer, countAggregator, storeSupplier);
+        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator,
storeSupplier);
     }
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes,
byte[]>> materialized) {
-        return aggregate(countInitializer, countAggregator, materialized);
+        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator,
materialized);
     }
 
     @Override
@@ -292,8 +283,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W>
windows,
                                                               final StateStoreSupplier<WindowStore>
storeSupplier) {
         return aggregate(
-                countInitializer,
-                countAggregator,
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator,
                 windows,
                 storeSupplier);
     }
@@ -383,7 +374,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K>
implements KGroupedStre
             }
         };
 
-        return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows,
Serdes.Long(), storeSupplier);
+        return aggregate(aggregateBuilder.countInitializer,
+                         aggregateBuilder.countAggregator,
+                         sessionMerger,
+                         sessionWindows,
+                         Serdes.Long(),
+                         storeSupplier);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
index b6e38f1..3992a79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -28,6 +30,7 @@ import org.apache.kafka.streams.kstream.WindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Objects;
@@ -60,47 +63,107 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends
AbstractStream<
 
     @Override
     public KTable<Windowed<K>, Long> count() {
-        return aggregate(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                }, new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1;
-                    }
-                },
+        return doAggregate(
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator,
                 Serdes.Long());
     }
 
-    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregate(aggregateBuilder.countInitializer, aggregateBuilder.countAggregator,
materialized);
+    }
+
+
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
-                                                  final Aggregator<? super K, ? super
V, VR> aggregator,
-                                                  final Serde<VR> aggValueSerde) {
+                                                  final Aggregator<? super K, ? super
V, VR> aggregator) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
+        return doAggregate(initializer, aggregator, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Initializer<VR>
initializer,
+                                                     final Aggregator<? super K, ? super
V, VR> aggregator,
+                                                     final Serde<VR> serde) {
         final String storeName = builder.newStoreName(AGGREGATE_NAME);
         return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
storeName, initializer, aggregator),
                                                                 AGGREGATE_NAME,
-                                                                windowStoreBuilder(storeName,
aggValueSerde),
+                                                                windowStoreBuilder(storeName,
serde),
                                                                 false);
     }
 
     @SuppressWarnings("unchecked")
     @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
+                                                  final Aggregator<? super K, ? super
V, VR> aggregator,
+                                                  final Materialized<K, VR, WindowStore<Bytes,
byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
+                                                                                        
    materializedInternal.storeName(),
+                                                                                        
    initializer,
+                                                                                        
    aggregator),
+                                                                AGGREGATE_NAME,
+                                                                materialize(materializedInternal),
+                                                                true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         final String storeName = builder.newStoreName(REDUCE_NAME);
         return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K,
V, W>(windows, storeName, reducer),
                                                                REDUCE_NAME,
                                                                windowStoreBuilder(storeName,
valSerde),
+                                                               true);
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final
Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
+
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K,
V, W>(windows, materializedInternal.storeName(), reducer),
+                                                               REDUCE_NAME,
+                                                               materialize(materializedInternal),
                                                                false);
+    }
 
+    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K,
VR, WindowStore<Bytes, byte[]>> materialized) {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
+        if (supplier == null) {
+            supplier = Stores.persistentWindowStore(materialized.storeName(),
+                                                    windows.maintainMs(),
+                                                    windows.segments,
+                                                    windows.size(),
+                                                    false);
+        }
+        final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
+                                                                                   materialized.keySerde(),
+                                                                                   materialized.valueSerde());
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+        return builder;
     }
 
+
     private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final
String storeName, final Serde<VR> aggValueSerde) {
         return Stores.windowStoreBuilder(
                 Stores.persistentWindowStore(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 7ff24da..81e8ef7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -110,6 +110,8 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
 
         final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
@@ -306,8 +308,8 @@ public class KStreamAggregationIntegrationTest {
         groupedStream.windowedBy(TimeWindows.of(500L))
                 .aggregate(
                         initializer,
-                        aggregator,
-                        Serdes.Integer())
+                        aggregator
+                )
                 .toStream(new KeyValueMapper<Windowed<String>, Integer, String>()
{
                     @Override
                     public String apply(final Windowed<String> windowedKey, final Integer
value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8f14719/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
index 83e2e11..f5de96f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
@@ -18,24 +18,31 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -76,7 +83,6 @@ public class WindowedKStreamImplTest {
     }
 
 
-
     @Test
     public void shouldReduceWindowed() {
         final Map<Windowed<String>, String> results = new HashMap<>();
@@ -99,8 +105,8 @@ public class WindowedKStreamImplTest {
     public void shouldAggregateWindowed() {
         final Map<Windowed<String>, String> results = new HashMap<>();
         windowedStream.aggregate(MockInitializer.STRING_INIT,
-                                 MockAggregator.TOSTRING_ADDER,
-                                 Serdes.String())
+                                 MockAggregator.TOSTRING_ADDER
+        )
                 .toStream()
                 .foreach(new ForeachAction<Windowed<String>, String>() {
                     @Override
@@ -114,14 +120,66 @@ public class WindowedKStreamImplTest {
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeCount() {
+        windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
+                                     .withKeySerde(Serdes.String())
+                                     .withValueSerde(Serdes.Long()));
+
+        processData();
+        final WindowStore<String, Long> windowStore = (WindowStore<String, Long>)
driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeReduced() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
+                                      .withKeySerde(Serdes.String())
+                                      .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("reduced");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldMaterializeAggregated() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
+                                         .withKeySerde(Serdes.String())
+                                         .withValueSerde(Serdes.String()));
+
+        processData();
+        final WindowStore<String, String> windowStore = (WindowStore<String, String>)
driver.allStateStores().get("aggregated");
+        final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1",
"2", 0, 1000));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
+                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
+                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1"))));
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
-        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String());
+        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
-        windowedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String());
+        windowedStream.aggregate(MockInitializer.STRING_INIT, null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -129,8 +187,47 @@ public class WindowedKStreamImplTest {
         windowedStream.reduce(null);
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 null,
+                                 Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 (Materialized) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
+        windowedStream.reduce(null,
+                              Materialized.<String, String, WindowStore<Bytes, byte[]>>as("store"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
+        windowedStream.reduce(MockReducer.STRING_ADDER,
+                              null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
+        windowedStream.count(null);
+    }
+
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory());
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(),
0);
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);


Mime
View raw message