kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5853; implement WindowedKStream
Date Fri, 08 Sep 2017 15:49:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk beeed8660 -> e16b9143d


KAFKA-5853; implement WindowedKStream

Add the `WindowedKStream` interface and implementation of methods that don't require `Materialized`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3809 from dguy/kgrouped-stream-windowed-by


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e16b9143
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e16b9143
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e16b9143

Branch: refs/heads/trunk
Commit: e16b9143dfcecbd58e3bebecbdb7d8e933b88cc4
Parents: beeed86
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 8 16:49:18 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 8 16:49:18 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  55 +++++--
 docs/streams/upgrade-guide.html                 |   6 +
 .../kafka/streams/kstream/KGroupedStream.java   |  14 ++
 .../kafka/streams/kstream/WindowedKStream.java  | 150 +++++++++++++++++++
 .../kstream/internals/KGroupedStreamImpl.java   |  22 ++-
 .../kstream/internals/WindowedKStreamImpl.java  | 143 ++++++++++++++++++
 .../KStreamAggregationIntegrationTest.java      |  55 +++----
 .../internals/KGroupedStreamImplTest.java       |   1 +
 .../internals/WindowedKStreamImplTest.java      | 144 ++++++++++++++++++
 9 files changed, 544 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index b8d3ae4..05acb55 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1175,9 +1175,10 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         Once records are grouped by key via <code>groupByKey</code> or <code>groupBy</code>
-- and
         thus represented as either a <code>KGroupedStream</code> or a
         <code>KGroupedTable</code> -- they can be aggregated via an operation
such as
-        <code>reduce</code>. Aggregations are <i>key-based</i> operations,
i.e.
-        they always operate over records (notably record values) <i>of the same key</i>.
You may
-        choose to perform aggregations on
+        <code>reduce</code>.
+        For windowed aggregations use <code>windowedBy(Windows).reduce(Reducer)</code>.
+        Aggregations are <i>key-based</i> operations, i.e.they always operate
over records (notably record values) <i>of the same key</i>.
+        You maychoose to perform aggregations on
         <a href="#streams_dsl_windowing">windowed</a> or non-windowed data.
     </p>
     <table class="data-table" border="1">
@@ -1205,20 +1206,20 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                     Several variants of <code>aggregate</code> exist, see Javadocs
for details.
                 </p>
                 <pre class="brush: java;">
-                    KGroupedStream&lt;byte[], String&gt; groupedStream = ...;
-                    KGroupedTable&lt;byte[], String&gt; groupedTable = ...;
+                    KGroupedStream&lt;Bytes, String&gt; groupedStream = ...;
+                    KGroupedTable&lt;Bytes, String&gt; groupedTable = ...;
 
                     // Java 8+ examples, using lambda expressions
 
                     // Aggregating a KGroupedStream (note how the value type changes from
String to Long)
-                    KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
                         () -> 0L, /* initializer */
                         (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
/* adder */
                         Serdes.Long(), /* serde for aggregate value */
                         "aggregated-stream-store" /* state store name */);
 
                     // Aggregating a KGroupedTable (note how the value type changes from
String to Long)
-                    KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
                         () -> 0L, /* initializer */
                         (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
/* adder */
                         (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(),
/* subtractor */
@@ -1226,19 +1227,26 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                         "aggregated-table-store" /* state store name */);
 
 
+                    // windowed aggregation
+                    KTable&lt;Windowed&ltBytes&gt;, Long&gt; windowedAggregate
= groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(() -> 0L, /* initializer */
+                            (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
/* aggregator */
+                            Serdes.Long()) /* serde for aggregate value */
+
+
                     // Java 7 examples
 
                     // Aggregating a KGroupedStream (note how the value type changes from
String to Long)
-                    KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.aggregate(
                         new Initializer&lt;Long&gt;() { /* initializer */
                           @Override
                           public Long apply() {
                             return 0L;
                           }
                         },
-                        new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
                           @Override
-                          public Long apply(byte[] aggKey, String newValue, Long aggValue)
{
+                          public Long apply(Bytes aggKey, String newValue, Long aggValue)
{
                             return aggValue + newValue.length();
                           }
                         },
@@ -1246,27 +1254,44 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                         "aggregated-stream-store");
 
                     // Aggregating a KGroupedTable (note how the value type changes from
String to Long)
-                    KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
+                    KTable&lt;Bytes, Long&gt; aggregatedTable = groupedTable.aggregate(
                         new Initializer&lt;Long&gt;() { /* initializer */
                           @Override
                           public Long apply() {
                             return 0L;
                           }
                         },
-                        new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* adder */
                           @Override
-                          public Long apply(byte[] aggKey, String newValue, Long aggValue)
{
+                          public Long apply(Bytes aggKey, String newValue, Long aggValue)
{
                             return aggValue + newValue.length();
                           }
                         },
-                        new Aggregator&lt;byte[], String, Long&gt;() { /* subtractor
*/
+                        new Aggregator&lt;Bytes, String, Long&gt;() { /* subtractor
*/
                           @Override
-                          public Long apply(byte[] aggKey, String oldValue, Long aggValue)
{
+                          public Long apply(Bytes aggKey, String oldValue, Long aggValue)
{
                             return aggValue - oldValue.length();
                           }
                         },
                         Serdes.Long(),
                         "aggregated-table-store");
+
+                    // Windowed aggregation
+                    KTable&lt;Bytes, Long&gt; aggregatedStream = groupedStream.windowedBy(TimeWindows.of(TimeUnit.MINUTES(5).toMillis())
+                        .aggregate(
+                            new Initializer&lt;Long&gt;() { /* initializer */
+                              @Override
+                              public Long apply() {
+                                return 0L;
+                              }
+                            },
+                            new Aggregator&lt;Bytes, String, Long&gt;() { /* adder
*/
+                              @Override
+                              public Long apply(Bytes aggKey, String newValue, Long aggValue)
{
+                                return aggValue + newValue.length();
+                              }
+                            },
+                            Serdes.Long());
                 </pre>
                 <p>
                     Detailed behavior of <code>KGroupedStream</code>:

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index a868c91..ffb365e 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,6 +86,12 @@
     </p>
 
     <p>
+        Windowed aggregations have moved from <code>KGroupedStream</code> to
<code>WindowedKStream</code>.
+        You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
+        Note: the previous aggregate functions on <code>KGroupedStream</code>
still work, but have been deprecated.
+    </p>
+
+    <p>
         The Processor API was extended to allow users to schedule <code>punctuate</code>
functions either based on data-driven <b>stream time</b> or wall-clock time.
         As a result, the original <code>ProcessorContext#schedule</code> is deprecated
with a new overloaded function that accepts a user customizable <code>Punctuator</code>
callback interface, which triggers its <code>punctuate</code> API method periodically
based on the <code>PunctuationType</code>.
         The <code>PunctuationType</code> determines what notion of time is used
for the punctuation scheduling: either <a href="/{{version}}/documentation/streams/core-concepts#streams_time">stream
time</a> or wall-clock time (by default, <b>stream time</b> is configured
to represent event time via <code>TimestampExtractor</code>).

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index e6faf8c..f12c2b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -223,7 +223,9 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys
and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W>
windows);
 
     /**
@@ -619,7 +621,9 @@ public interface KGroupedStream<K, V> {
      * @param windows   the specification of the aggregation {@link Windows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V>
reducer,
                                                      final Windows<W> windows);
 
@@ -1112,7 +1116,9 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
+     * @deprecated use {@link #windowedBy(Windows)}
      */
+    @Deprecated
     <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
                                                              final Aggregator<? super
K, ? super V, VR> aggregator,
                                                              final Windows<W> windows,
@@ -1333,4 +1339,12 @@ public interface KGroupedStream<K, V> {
                                          final Serde<T> aggValueSerde,
                                          final StateStoreSupplier<SessionStore> storeSupplier);
 
+    /**
+     * Create a new {@link WindowedKStream} instance that can be used to perform windowed
aggregations.
+     * @param windows the specification of the aggregation {@link Windows}
+     * @param <W>     the window type
+     * @return an instance of {@link WindowedKStream}
+     */
+    <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W>
windows);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
new file mode 100644
index 0000000..4f73db7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedKStream.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+/**
+ * {@code WindowedKStream} is an abstraction of a <i>windowed</i> record stream
of {@link KeyValue} pairs.
+ * It is an intermediate representation of a {@link KStream} in order to apply a windowed
aggregation operation on the original
+ * {@link KStream} records.
+ * <p>
+ * It is an intermediate representation after a grouping and windowing of a {@link KStream}
before an aggregation is applied to the
+ * new (partitioned) windows resulting in a windowed {@link KTable}
+ * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type
{@link Windowed Windowed<K>}.
+ * <p>
+ * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)}
.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KStream
+ * @see KGroupedStream
+ */
+public interface WindowedKStream<K, V> {
+
+    /**
+     * Count the number of records in this stream by the grouped key and the defined windows.
+     * Records with {@code null} key or value are ignored.
+     * The specified {@code windows} define either hopping time windows that can be overlapping
or tumbling (c.f.
+     * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
+     * The result is written into a local windowed {@link KeyValueStore} (which is basically
an ever-updating
+     * materialized view) that can be queried using the provided {@code queryableName}.
+     * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+     * Furthermore, updates to the store are sent downstream into a windowed {@link KTable}
changelog stream, where
+     * "windowed" implies that the {@link KTable} key is a combined key of the original record
key and a window ID.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same window and key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog",
where "applicationId" is
+     * user-specified in {@link StreamsConfig StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName"
is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<Windowed<K>, Long> count();
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)}
as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input
record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first
record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result
provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Serde)} can be used to compute aggregate
functions like
+     * count (c.f. {@link #count()}).
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog",
where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName"
is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate
aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will
be used
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
+                                           final Aggregator<? super K, ? super V, VR>
aggregator,
+                                           final Serde<VR> aggValueSerde);
+
+    /**
+     * Combine the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of
the input value.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new
aggregate using the current
+     * aggregate and the record's value.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like
sum, min, or max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog",
where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName"
is an internal name
+     * and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5fd5f6d..57114b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,8 +40,8 @@ import java.util.Set;
 
 class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K,
V> {
 
-    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
-    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+    static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+    static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
@@ -102,7 +103,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V>
reducer,
                                                             final Windows<W> windows)
{
-        return reduce(reducer, windows, (String) null);
+        return windowedBy(windows).reduce(reducer);
     }
 
     @SuppressWarnings("unchecked")
@@ -164,7 +165,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
                                                                   final Aggregator<? super
K, ? super V, T> aggregator,
                                                                   final Windows<W>
windows,
                                                                   final Serde<T> aggValueSerde)
{
-        return aggregate(initializer, aggregator, windows, aggValueSerde, null);
+        return windowedBy(windows).aggregate(initializer, aggregator, aggValueSerde);
     }
 
     @SuppressWarnings("unchecked")
@@ -219,7 +220,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
 
     @Override
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W>
windows) {
-        return count(windows, (String) null);
+        return windowedBy(windows).count();
     }
 
     @Override
@@ -291,6 +292,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K>
implements KGroupedStre
 
     }
 
+    @Override
+    public <W extends Window> WindowedKStream<K, V> windowedBy(final Windows<W>
windows) {
+        return new WindowedKStreamImpl<>(windows,
+                                         builder,
+                                         sourceNodes,
+                                         name,
+                                         keySerde,
+                                         valSerde,
+                                         repartitionRequired);
+    }
+
     @SuppressWarnings("unchecked")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
final String queryableStoreName) {
         determineIsQueryable(queryableStoreName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
new file mode 100644
index 0000000..28666b8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
+import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
+
+public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K>
implements WindowedKStream<K, V> {
+
+    private final Windows<W> windows;
+    private final boolean repartitionRequired;
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+
+    WindowedKStreamImpl(final Windows<W> windows,
+                        final InternalStreamsBuilder builder,
+                        final Set<String> sourceNodes,
+                        final String name,
+                        final Serde<K> keySerde,
+                        final Serde<V> valSerde,
+                        final boolean repartitionRequired) {
+        super(builder, name, sourceNodes);
+        Objects.requireNonNull(windows, "windows can't be null");
+        this.valSerde = valSerde;
+        this.keySerde = keySerde;
+        this.repartitionRequired = repartitionRequired;
+        this.windows = windows;
+    }
+
+    @Override
+    public KTable<Windowed<K>, Long> count() {
+        return aggregate(
+                new Initializer<Long>() {
+                    @Override
+                    public Long apply() {
+                        return 0L;
+                    }
+                }, new Aggregator<K, V, Long>() {
+                    @Override
+                    public Long apply(K aggKey, V value, Long aggregate) {
+                        return aggregate + 1;
+                    }
+                },
+                Serdes.Long());
+    }
+
+    @Override
+    public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR>
initializer,
+                                                  final Aggregator<? super K, ? super
V, VR> aggregator,
+                                                  final Serde<VR> aggValueSerde) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        final String aggFunctionName = builder.newName(AGGREGATE_NAME);
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+        return doAggregate(aggValueSerde,
+                           aggFunctionName,
+                           storeName,
+                           new KStreamWindowAggregate<>(windows, storeName, initializer,
aggregator));
+    }
+
+    @Override
+    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        final String storeName = builder.newStoreName(REDUCE_NAME);
+        return doAggregate(valSerde,
+                           builder.newName(REDUCE_NAME),
+                           storeName,
+                           new KStreamWindowReduce<>(windows, storeName, reducer));
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR>
aggValueSerde,
+                                                     final String aggFunctionName,
+                                                     final String storeName,
+                                                     final KStreamAggProcessorSupplier aggSupplier)
{
+        final String sourceName = repartitionIfRequired(storeName);
+        final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder(
+                Stores.persistentWindowStore(
+                        storeName,
+                        windows.maintainMs(),
+                        windows.segments,
+                        windows.size(),
+                        false),
+                keySerde,
+                aggValueSerde)
+                .withCachingEnabled();
+
+        builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName);
+        builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
+
+        return new KTableImpl<>(
+                builder,
+                aggFunctionName,
+                aggSupplier,
+                sourceName.equals(this.name) ? sourceNodes
+                        : Collections.singleton(sourceName),
+                storeName,
+                false);
+    }
+
+    /**
+     * @return the new sourceName if repartitioned. Otherwise the name of this stream
+     */
+    private String repartitionIfRequired(final String queryableStoreName) {
+        if (!repartitionRequired) {
+            return this.name;
+        }
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 3cd8b10..e9927bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -202,15 +202,17 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
 
         groupedStream
-            .reduce(reducer, TimeWindows.of(500L), "reduce-time-windows")
-            .toStream(new KeyValueMapper<Windowed<String>, String, String>()
{
-                @Override
-                public String apply(final Windowed<String> windowedKey, final String
value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            })
+                .windowedBy(TimeWindows.of(500L))
+                .reduce(reducer)
+                .toStream(new KeyValueMapper<Windowed<String>, String, String>()
{
+                    @Override
+                    public String apply(final Windowed<String> windowedKey, final String
value) {
+                        return windowedKey.key() + "@" + windowedKey.window().start();
+                    }
+                })
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
+
         startStreams();
 
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
@@ -302,18 +304,18 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
         produceMessages(secondTimestamp);
 
-        groupedStream.aggregate(
-            initializer,
-            aggregator,
-            TimeWindows.of(500L),
-            Serdes.Integer(), "aggregate-by-key-windowed")
-            .toStream(new KeyValueMapper<Windowed<String>, Integer, String>()
{
-                @Override
-                public String apply(final Windowed<String> windowedKey, final Integer
value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            })
-            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+        groupedStream.windowedBy(TimeWindows.of(500L))
+                .aggregate(
+                        initializer,
+                        aggregator,
+                        Serdes.Integer())
+                .toStream(new KeyValueMapper<Windowed<String>, Integer, String>()
{
+                    @Override
+                    public String apply(final Windowed<String> windowedKey, final Integer
value) {
+                        return windowedKey.key() + "@" + windowedKey.window().start();
+                    }
+                })
+                .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
 
         startStreams();
 
@@ -414,13 +416,14 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
 
         stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
-            .count(TimeWindows.of(500L), "count-windows")
-            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
-                @Override
-                public String apply(final Windowed<Integer> windowedKey, final Long
value) {
-                    return windowedKey.key() + "@" + windowedKey.window().start();
-                }
-            }).to(Serdes.String(), Serdes.Long(), outputTopic);
+                .windowedBy(TimeWindows.of(500L))
+                .count()
+                .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>()
{
+                    @Override
+                    public String apply(final Windowed<Integer> windowedKey, final
Long value) {
+                        return windowedKey.key() + "@" + windowedKey.window().start();
+                    }
+                }).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         startStreams();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 849bb00..78f8dbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -525,6 +525,7 @@ public class KGroupedStreamImplTest {
         doCountWindowed(results);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountWindowedWithInternalStoreName() throws Exception {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e16b9143/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
new file mode 100644
index 0000000..83e2e11
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImplTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedKStream;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class WindowedKStreamImplTest {
+
+    private static final String TOPIC = "input";
+    private final StreamsBuilder builder = new StreamsBuilder();
+
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private WindowedKStream<String, String> windowedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(),
Serdes.String()));
+        windowedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(TimeWindows.of(500L));
+    }
+
+    @Test
+    public void shouldCountWindowed() {
+        final Map<Windowed<String>, Long> results = new HashMap<>();
+        windowedStream.count()
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final Long value)
{
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L));
+    }
+
+
+
+    @Test
+    public void shouldReduceWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.reduce(MockReducer.STRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value)
{
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3"));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() {
+        final Map<Windowed<String>, String> results = new HashMap<>();
+        windowedStream.aggregate(MockInitializer.STRING_INIT,
+                                 MockAggregator.TOSTRING_ADDER,
+                                 Serdes.String())
+                .toStream()
+                .foreach(new ForeachAction<Windowed<String>, String>() {
+                    @Override
+                    public void apply(final Windowed<String> key, final String value)
{
+                        results.put(key, value);
+                    }
+                });
+        processData();
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2"));
+        assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1"));
+        assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
+        windowedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
+        windowedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
+        windowedStream.reduce(null);
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory());
+        driver.setTime(10);
+        driver.process(TOPIC, "1", "1");
+        driver.setTime(15);
+        driver.process(TOPIC, "1", "2");
+        driver.setTime(500);
+        driver.process(TOPIC, "1", "3");
+        driver.process(TOPIC, "2", "1");
+        driver.flushState();
+    }
+
+}
\ No newline at end of file


Mime
View raw message