kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-3561: Auto create through topic for KStream aggregation and join
Date Thu, 16 Jun 2016 18:56:38 GMT
KAFKA-3561: Auto create through topic for KStream aggregation and join

guozhangwang enothereska mjsax miguno

If you get a chance can you please take a look at this. I've done the repartitioning in the join, but it results in 2 internal topics for each join. This seems like overkill as sometimes we wouldn't need to repartition at all, others just 1 topic, and then sometimes both, but I'm not sure how we can know that.

I'd also need to implement something similar for leftJoin, but again, i'd like to see if i'm heading down the right path or if anyone has any other bright ideas.

For reference - https://github.com/apache/kafka/pull/1453 - the previous PR

Thanks for taking the time and looking forward to getting some welcome advice :-)

Author: Damian Guy <damian.guy@gmail.com>
Author: Damian Guy <damian@continuum.local>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1472 from dguy/KAFKA-3561


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d9d1cb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d9d1cb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d9d1cb2

Branch: refs/heads/trunk
Commit: 7d9d1cb2355e33270703280ed6bb712033b03d26
Parents: 54ba228
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Jun 16 11:56:32 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 16 11:56:32 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  15 +
 .../examples/pageview/PageViewTypedDemo.java    |  12 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   3 +-
 .../examples/wordcount/WordCountDemo.java       |   3 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   2 +
 .../kafka/streams/kstream/KGroupedStream.java   | 119 ++++
 .../apache/kafka/streams/kstream/KStream.java   | 215 ++-----
 .../kafka/streams/kstream/KStreamBuilder.java   |   4 +-
 .../kstream/internals/KGroupedStreamImpl.java   | 180 ++++++
 .../kstream/internals/KGroupedTableImpl.java    |   4 +-
 .../kstream/internals/KStreamAggregate.java     |   8 +-
 .../streams/kstream/internals/KStreamImpl.java  | 477 ++++++++--------
 .../kstream/internals/KStreamKStreamJoin.java   |   8 +-
 .../kstream/internals/KStreamReduce.java        |   9 +-
 .../kstream/internals/KStreamWindowReduce.java  |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   2 +-
 .../streams/processor/PartitionGrouper.java     |   2 +-
 .../streams/processor/TopologyBuilder.java      |  56 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../internals/StreamPartitionAssignor.java      |  59 +-
 .../processor/internals/StreamThread.java       |   2 +-
 .../InternalTopicIntegrationTest.java           |   9 +-
 .../integration/JoinIntegrationTest.java        |   5 +-
 .../KGroupedStreamIntegrationTest.java          | 472 ++++++++++++++++
 .../integration/KStreamRepartitionJoinTest.java | 565 +++++++++++++++++++
 .../integration/WordCountIntegrationTest.java   |  24 +-
 .../integration/utils/IntegrationTestUtils.java |  14 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   6 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  16 -
 .../internals/KStreamWindowAggregateTest.java   |  28 +-
 .../streams/processor/TopologyBuilderTest.java  |  10 +-
 .../internals/StreamPartitionAssignorTest.java  |  61 +-
 .../streams/smoketest/SmokeTestClient.java      |  26 +-
 streams/src/test/resources/log4j.properties     |  21 +
 35 files changed, 1944 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f52cce..9a099d0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -148,7 +148,7 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
-      <allow pkg="org.hamcrest.CoreMatchers" />
+      <allow pkg="org.hamcrest" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 742d14f..a818d53 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -163,4 +165,17 @@ public class TestUtils {
         return memoryRecords.buffer();
     }
 
+    public static Properties producerConfig(final String bootstrapServers,
+                                            final Class keySerializer,
+                                            final Class valueSerializer,
+                                            final Properties additional) {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.put(ProducerConfig.ACKS_CONFIG, "all");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+        properties.putAll(additional);
+        return properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index e53b037..19391d8 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -131,9 +131,16 @@ public class PageViewTypedDemo {
         final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>();
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountDeserializer.configure(serdeProps, false);
-
         final Serde<RegionCount> regionCountSerde = Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
 
+        final Serializer<PageViewByRegion> pageViewByRegionSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionSerializer.configure(serdeProps, false);
+        final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionDeserializer.configure(serdeProps, false);
+        final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
+
         KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
 
         KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde, "streams-userprofile-input");
@@ -160,7 +167,8 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), pageViewByRegionSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 8a0af6c..e9aa467 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -99,7 +99,8 @@ public class PageViewUntypedDemo {
                         return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), jsonSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 12395f9..bf1d8cb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -72,7 +72,8 @@ public class WordCountDemo {
                         return new KeyValue<>(value, value);
                     }
                 })
-                .countByKey("Counts");
+                .groupByKey()
+                .count("Counts");
 
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f05b02c..6605335 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -134,6 +134,8 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always have value
         String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
+        builder.setApplicationId(applicationId);
+
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
new file mode 100644
index 0000000..25fdb3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> of key-value pairs
+ * usually grouped on a different key than the original stream key
+ *
+ * <p>
+ * It is an intermediate representation of a {@link KStream} before an
+ * aggregation is applied to the new partitions resulting in a new {@link KTable}.
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ *
+ * @see KStream
+ */
+@InterfaceStability.Unstable
+public interface KGroupedStream<K, V> {
+
+
+    /**
+     * Combine values of this stream by the grouped key into a new instance of ever-updating
+     * {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param name              the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(Reducer<V> reducer,
+                        String name);
+
+
+    /**
+     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param windows           the specification of the aggregation {@link Windows}
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                     Windows<W> windows);
+
+    /**
+     * Aggregate values of this stream by key into a new instance of a {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that represents the latest (rolling) aggregate for each key
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> aggregator,
+                               Serde<T> aggValueSerde,
+                               String name);
+
+    /**
+     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link Windows}
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key within that window
+     */
+    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer,
+                                                           Aggregator<K, V, T> aggregator,
+                                                           Windows<W> windows,
+                                                           Serde<T> aggValueSerde);
+
+
+    /**
+     * Count number of records of this stream by key into a new instance of a {@link KTable}
+     *
+     * @param name  the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count(String name);
+
+
+    /**
+     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     *
+     * @param windows   the specification of the aggregation {@link Windows}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
+     *         where each table contains records with unmodified keys and values
+     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a1ecfa4..ae743b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -317,6 +317,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -343,7 +344,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -361,6 +362,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -387,7 +389,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -405,12 +407,15 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with this stream
      * @param joiner            the instance of {@link ValueJoiner}
      * @param windows           the specification of the {@link JoinWindows}
      * @param keySerde          key serdes for materializing the other stream,
      *                          if not specified the default serdes defined in the configs will be used
+     * @param thisValSerde    value serdes for materializing this stream,
+     *                          if not specified the default serdes defined in the configs will be used
      * @param otherValueSerde   value serdes for materializing the other stream,
      *                          if not specified the default serdes defined in the configs will be used
      * @param <V1>              the value type of the other stream
@@ -424,11 +429,12 @@ public interface KStream<K, V> {
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde);
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -446,6 +452,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
      * @param table     the instance of {@link KTable} joined with this stream
      * @param joiner    the instance of {@link ValueJoiner}
@@ -458,182 +465,76 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
 
     /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param windows           the specification of the aggregation {@link Windows}
-     * @param keySerde          key serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                          Windows<W> windows,
-                                                          Serde<K> keySerde,
-                                                          Serde<V> valueSerde);
-
-    /**
-     * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param windows the specification of the aggregation {@link Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) aggregate for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows);
-
-    /**
-     * Combine values of this stream by key into a new instance of ever-updating {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param keySerde          key serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated table,
-     *                          if not specified the default serdes defined in the configs will be used
-     * @param name              the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer,
-                             Serde<K> keySerde,
-                             Serde<V> valueSerde,
-                             String name);
-
-    /**
-     * Combine values of this stream by key into a new instance of ever-updating {@link KTable} with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param name    the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
-
-    /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}.
+     * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link KStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param keySerde      key serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     * @param table         the instance of {@link KTable} joined with this stream
+     * @param valueJoiner   the instance of {@link ValueJoiner}
+     * @param keySerde      key serdes for materializing this stream.
+     *                      If not specified the default serdes defined in the configs will be used
+     * @param valSerde      value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <V1>          the value type of the table
+     * @param <V2>          the value type of the new stream
      *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within the joining window intervals
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                Aggregator<K, V, T> aggregator,
-                                                                Windows<W> windows,
-                                                                Serde<K> keySerde,
-                                                                Serde<T> aggValueSerde);
-
+    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table,
+                                     ValueJoiner<V, V1, V2> valueJoiner,
+                                     Serde<K> keySerde,
+                                     Serde<V> valSerde);
     /**
-     * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
+     * Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
+     * default serializers and deserializers. If a record key is null it will not included in
+     * the resulting {@link KGroupedStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param selector      select the grouping key and value to be aggregated
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key within that window
+     * @return a {@link KGroupedStream} that contains the the grouped records of the original {@link KStream}
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                Aggregator<K, V, T> aggregator,
-                                                                Windows<W> windows);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector);
 
     /**
-     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}.
+     * Group the records of this {@link KStream} using the provided {@link KeyValueMapper}.
+     * If a record key is null it will not included in the resulting {@link KGroupedStream}
      *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param keySerde      key serdes for materializing the aggregated table,
+     * @param selector      select the grouping key and value to be aggregated
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
+     * @return a {@link KGroupedStream} that contains the the grouped records of the original {@link KStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    Serde<K> keySerde,
-                                    Serde<T> aggValueSerde,
-                                    String name);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> valSerde);
 
     /**
-     * Aggregate values of this stream by key into a new instance of ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values (of different type) that represent the latest (rolling) aggregate for each key
+     * Group the records with the same key into a {@link KGroupedStream} while preserving the
+     * original values. If a record key is null it will not included in the resulting
+     * {@link KGroupedStream}
+     * Default Serdes will be used
+     * @return a {@link KGroupedStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    String name);
+    KGroupedStream<K, V> groupByKey();
 
     /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}.
-     *
-     * @param windows       the specification of the aggregation {@link Windows}
-     * @param keySerde      key serdes for materializing the counting table,
+     * Group the records with the same key into a {@link KGroupedStream} while preserving the
+     * original values. If a record key is null it will not included in the resulting
+     * {@link KGroupedStream}
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, Serde<K> keySerde);
-
-    /**
-     * Count number of records of this stream by key on a window basis into a new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param windows       the specification of the aggregation {@link Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
-     *         where each table contains records with unmodified keys and values
-     *         that represent the latest (rolling) count (i.e., number of records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows);
-
-    /**
-     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}.
-     *
-     * @param keySerde      key serdes for materializing the counting table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
-     */
-    KTable<K, Long> countByKey(Serde<K> keySerde, String name);
-
-    /**
-     * Count number of records of this stream by key into a new instance of ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) count (i.e., number of records) for each key
+     * @return a {@link KGroupedStream}
      */
-    KTable<K, Long> countByKey(String name);
+    KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                    Serde<V> valSerde);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 53b2f4e..37d8921 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -89,7 +89,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
 
@@ -111,7 +111,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
new file mode 100644
index 0000000..1830484
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
+
+    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final boolean repartitionRequired;
+
+    public KGroupedStreamImpl(final KStreamBuilder topology,
+                              final String name,
+                              final Set<String> sourceNodes,
+                              final Serde<K> keySerde,
+                              final Serde<V> valSerde,
+                              final boolean repartitionRequired) {
+        super(topology, name, sourceNodes);
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer,
+                               final String name) {
+        return doAggregate(
+            new KStreamReduce<K, V>(name, reducer),
+            REDUCE_NAME,
+            keyValueStore(valSerde, name));
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                            Windows<W> windows) {
+        return (KTable<Windowed<K>, V>) doAggregate(
+            new KStreamWindowReduce<K, V, W>(windows, windows.name(), reducer),
+            REDUCE_NAME,
+            windowedStore(valSerde, windows)
+        );
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<K, V, T> aggregator,
+                                      final Serde<T> aggValueSerde,
+                                      final String name) {
+        return doAggregate(
+            new KStreamAggregate<>(name, initializer, aggregator),
+            AGGREGATE_NAME,
+            keyValueStore(aggValueSerde, name));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
+                                                                  final Aggregator<K, V, T> aggregator,
+                                                                  final Windows<W> windows,
+                                                                  final Serde<T> aggValueSerde) {
+        return (KTable<Windowed<K>, T>) doAggregate(
+            new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator),
+            AGGREGATE_NAME,
+            windowedStore(aggValueSerde, windows)
+        );
+    }
+
+    @Override
+    public KTable<K, Long> count(final String name) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, Serdes.Long(), name);
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, windows, Serdes.Long());
+    }
+
+    private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, final String name) {
+        return storeFactory(aggValueSerde, name).build();
+    }
+
+
+    private <W extends Window, T> StateStoreSupplier windowedStore(final Serde<T> aggValSerde,
+                                                                   final Windows<W> windows) {
+        return storeFactory(aggValSerde, windows.name())
+            .windowed(windows.maintainMs(), windows.segments, false)
+            .build();
+
+    }
+
+    private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<T> aggValueSerde,
+                                                                    final String name) {
+        return Stores.create(name)
+            .withKeys(keySerde)
+            .withValues(aggValueSerde)
+            .persistent();
+
+    }
+
+    private <T> KTable<K, T> doAggregate(
+        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+        final String functionName,
+        final StateStoreSupplier storeSupplier) {
+
+        final String aggFunctionName = topology.newName(functionName);
+
+        final String sourceName = repartitionIfRequired();
+
+        topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+        topology.addStateStore(storeSupplier, aggFunctionName);
+
+        return new KTableImpl<>(topology,
+                                aggFunctionName,
+                                aggregateSupplier,
+                                sourceName.equals(this.name) ? sourceNodes
+                                                             : Collections.singleton(sourceName));
+    }
+
+    /**
+     * @return the new sourceName if repartitioned. Otherwise the name of this stream
+     */
+    private String repartitionIfRequired() {
+        if (!repartitionRequired) {
+            return this.name;
+        }
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index f7fe4e5..7118bb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -45,8 +45,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
 
@@ -88,7 +86,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
 
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
+        String topic = name + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b6d1492..dc6410d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -61,14 +60,11 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream aggregate operator with state " + storeName + " should not be null.");
+                return;
 
             T oldAgg = store.get(key);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 91bcef9..79ff842 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -17,18 +17,16 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -36,9 +34,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -47,18 +42,17 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.Array;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
 
-    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
-
     private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
 
     private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
 
-    private static final String FILTER_NAME = "KSTREAM-FILTER-";
+    public static final String FILTER_NAME = "KSTREAM-FILTER-";
 
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
@@ -84,8 +78,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
-    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
-
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
     public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -100,8 +92,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
 
-    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
+    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
+    private final boolean repartitionRequired;
+
+    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes,
+                       boolean repartitionRequired) {
         super(topology, name, sourceNodes);
+        this.repartitionRequired = repartitionRequired;
     }
 
     @Override
@@ -110,7 +108,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -119,20 +117,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+        return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
+    }
+
+    private <K1> String internalSelectKey(final KeyValueMapper<K, V, K1> mapper) {
         String name = topology.newName(KEY_SELECT_NAME);
         topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, KeyValue<K1, V>>() {
             @Override
             public KeyValue<K1, V> apply(K key, V value) {
-                return new KeyValue(mapper.apply(key, value), value);
+                return new KeyValue<>(mapper.apply(key, value), value);
             }
         }), this.name);
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return name;
     }
 
     @Override
@@ -141,16 +143,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
+
     @Override
     public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
         String name = topology.newName(MAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -193,7 +196,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
     @Override
@@ -202,7 +205,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -218,7 +221,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
             topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
 
-            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes);
+            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
@@ -245,7 +248,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
@@ -315,7 +318,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, null, true);
     }
 
     @Override
@@ -325,7 +328,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
     }
 
     @Override
@@ -388,45 +391,87 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             Serde<V1> otherValueSerde,
             boolean outer) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValueSerde,
+                      otherValueSerde,
+                      new DefaultJoin(outer));
+    }
+
+    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
+                                         ValueJoiner<V, V1, R> joiner,
+                                         JoinWindows windows,
+                                         Serde<K> keySerde,
+                                         Serde<V> thisValueSerde,
+                                         Serde<V1> otherValueSerde,
+                                         KStreamImplJoin join) {
+        KStreamImpl<K, V> joinThis = this;
+        KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
+
+        if (joinThis.repartitionRequired) {
+            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
+        }
 
-        StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
-                .withKeys(keySerde)
-                .withValues(thisValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        if (joinOther.repartitionRequired) {
+            joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde);
+        }
+
+        joinThis.ensureJoinableWith(joinOther);
 
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        return join.join(joinThis,
+                         joinOther,
+                         joiner,
+                         windows,
+                         keySerde,
+                         thisValueSerde,
+                         otherValueSerde);
+    }
 
-        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
 
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
-        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
+    /**
+     * Repartition a stream. This is required on join operations occurring after
+     * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
+     * @param keySerde      Serdes for serializing the keys
+     * @param valSerde      Serdes for serilaizing the values
+     * @return a new {@link KStreamImpl}
+     */
+    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde,
+                                                 Serde<V> valSerde) {
 
-        KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
+        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde);
+        return new KStreamImpl<>(topology, repartitionedSourceName, Collections
+            .singleton(repartitionedSourceName), false);
+    }
 
-        String thisWindowStreamName = topology.newName(WINDOWED_NAME);
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
-        String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
+    static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
+                                                    Serde<K1> keySerde,
+                                                    Serde<V1> valSerde) {
+        Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
+        Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
+        Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
+        Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
 
-        topology.addProcessor(thisWindowStreamName, thisWindowedStream, this.name);
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
-        topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
-        topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+        String repartitionTopic = stream.name + REPARTITION_TOPIC_SUFFIX;
+        String sinkName = stream.topology.newName(SINK_NAME);
+        String filterName = stream.topology.newName(FILTER_NAME);
+        String sourceName = stream.topology.newName(SOURCE_NAME);
 
-        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
+        stream.topology.addInternalTopic(repartitionTopic);
+        stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
+            @Override
+            public boolean test(final K1 key, final V1 value) {
+                return key != null;
+            }
+        }, false), stream.name);
+
+        stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
+                         valSerializer, filterName);
+        stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
+                           repartitionTopic);
+
+        return sourceName;
     }
 
     @SuppressWarnings("unchecked")
@@ -436,28 +481,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
-
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
-
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
-
-        return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValSerde,
+                      otherValueSerde,
+                      new LeftJoin());
     }
 
     @Override
@@ -466,193 +499,197 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows) {
 
-        return leftJoin(other, joiner, windows, null, null);
+        return leftJoin(other, joiner, windows, null, null, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
-
-        String name = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
-        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
+        return leftJoin(other, joiner, null, null);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
     }
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                                 Windows<W> windows,
-                                                                 Serde<K> keySerde,
-                                                                 Serde<V> aggValueSerde) {
+    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
+                                          ValueJoiner<V, V1, R> joiner,
+                                          Serde<K> keySerde,
+                                          Serde<V> valueSerde) {
+
+        if (repartitionRequired) {
+            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
+                                                                                valueSerde
+            );
+            return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
+        } else {
+            return doStreamTableLeftJoin(other, joiner);
+        }
 
-        String reduceName = topology.newName(REDUCE_NAME);
+    }
 
-        KStreamWindowReduce<K, V, W> reduceSupplier = new KStreamWindowReduce<>(windows, windows.name(), reducer);
+    private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other,
+                                                        final ValueJoiner<V, V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        StateStoreSupplier reduceStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+        String name = topology.newName(LEFTJOIN_NAME);
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
+        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                                 Windows<W> windows) {
-
-        return reduceByKey(reducer, windows, null, null);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector) {
+        return groupBy(selector, null, null);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer,
-                                    Serde<K> keySerde,
-                                    Serde<V> aggValueSerde,
-                                    String name) {
-
-        String reduceName = topology.newName(REDUCE_NAME);
-
-        KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, reducer);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> valSerde) {
 
-        StateStoreSupplier reduceStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
-
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, sourceNodes);
+        String selectName = internalSelectKey(selector);
+        return new KGroupedStreamImpl<>(topology,
+                                        selectName,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde, true);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
-
-        return reduceByKey(reducer, null, null, name);
+    public KGroupedStream<K, V> groupByKey() {
+        return groupByKey(null, null);
     }
 
     @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                       Aggregator<K, V, T> aggregator,
-                                                                       Windows<W> windows,
-                                                                       Serde<K> keySerde,
-                                                                       Serde<T> aggValueSerde) {
-
-        String aggregateName = topology.newName(AGGREGATE_NAME);
-
-        KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
-
-        StateStoreSupplier aggregateStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                           Serde<V> valSerde) {
+        return new KGroupedStreamImpl<>(topology,
+                                        this.name,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde,
+                                        this.repartitionRequired);
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, aggregateSupplier, sourceNodes);
+    private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,
+                                                                     final Serde<K> keySerde,
+                                                                     final Serde<V> valueSerde,
+                                                                     final String nameSuffix) {
+        return Stores.create(windows.name() + nameSuffix)
+            .withKeys(keySerde)
+            .withValues(valueSerde)
+            .persistent()
+            .windowed(windows.maintainMs(), windows.segments, true)
+            .build();
     }
 
-    @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer,
-                                                                       Aggregator<K, V, T> aggregator,
-                                                                       Windows<W> windows) {
+    private interface KStreamImplJoin {
 
-        return aggregateByKey(initializer, aggregator, windows, null, null);
+        <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                            KStream<K1, V2> other,
+                                            ValueJoiner<V1, V2, R> joiner,
+                                            JoinWindows windows,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> lhsValueSerde,
+                                            Serde<V2> otherValueSerde);
     }
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           Serde<K> keySerde,
-                                           Serde<T> aggValueSerde,
-                                           String name) {
+    private class DefaultJoin implements KStreamImplJoin {
 
-        String aggregateName = topology.newName(AGGREGATE_NAME);
+        private final boolean outer;
 
-        KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new KStreamAggregate<>(name, initializer, aggregator);
+        DefaultJoin(final boolean outer) {
+            this.outer = outer;
+        }
 
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+
+            StateStoreSupplier thisWindow =
+                createWindowedStateStore(windows, keySerde, lhsValueSerde, "-this");
+
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
+
+
+            KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(),
+                                                                                   windows.before + windows.after + 1,
+                                                                                   windows.maintainMs());
+            KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindow .name(),
+                                                                                    windows.before + windows.after + 1,
+                                                                                    windows.maintainMs());
+
+            KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+                                                                                  windows.before,
+                                                                                  windows.after,
+                                                                                  joiner,
+                                                                                  outer);
+            KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+                                                                                   windows.before,
+                                                                                   windows.after,
+                                                                                   reverseJoiner(joiner),
+                                                                                   outer);
+
+            KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+
+            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+            String joinMergeName = topology.newName(MERGE_NAME);
+
+            topology.addProcessor(thisWindowStreamName, thisWindowedStream, ((AbstractStream) lhs).name);
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
+            topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
+            topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+            topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
+            topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
+
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinMergeName, allSourceNodes, false);
+        }
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the sources
-        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
-    }
+    private class LeftJoin implements KStreamImplJoin {
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           String name) {
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, "-other");
 
-        return aggregateByKey(initializer, aggregator, null, null, name);
-    }
+            KStreamJoinWindow<K1, V1>
+                otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+            KStreamKStreamJoin<K1, R, V1, V2>
+                joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
-                                                                   Serde<K> keySerde) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, windows, keySerde, Serdes.Long());
-    }
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = topology.newName(LEFTJOIN_NAME);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows) {
-        return countByKey(windows, null);
-    }
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
+            topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
 
-    @Override
-    public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, keySerde, Serdes.Long(), name);
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
+        }
     }
 
-    @Override
-    public KTable<K, Long> countByKey(String name) {
-        return countByKey(null, name);
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 72029a8..edde009 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,14 +60,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
             otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V1 value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+                return;
 
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index ed6e216..dd5ba45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -58,14 +57,12 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
+            // If the key is null we don't need to proceed
             if (key == null)
-                throw new StreamsException("Record key for KStream reduce operator with state " + storeName + " should not be null.");
+                return;
 
             V oldAgg = store.get(key);
             V newAgg = oldAgg;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index a526506..46d99a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -68,7 +68,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         @Override
         public void process(K key, V value) {
-            // if the key is null, we do not need proceed aggregating the record
+            // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null)
                 return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 51d4cb4..c5543ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -229,7 +229,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
             }
         }), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, false);
     }
 
     @Override


Mime
View raw message