kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3104: add windowed aggregation to KStream
Date Mon, 18 Jan 2016 20:14:56 GMT
KAFKA-3104: add windowed aggregation to KStream

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Mastuda

Closes #781 from guozhangwang/K3104


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62eb599
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62eb599
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62eb599

Branch: refs/heads/trunk
Commit: a62eb5993f5517a64dd1020b0a9bbd1012f7ee67
Parents: cc3570d
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Jan 18 12:14:43 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 18 12:14:43 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/HoppingWindows.java   |  23 +-
 .../kafka/streams/kstream/JoinWindows.java      |  33 +--
 .../apache/kafka/streams/kstream/KStream.java   |  18 +-
 .../apache/kafka/streams/kstream/KTable.java    |  18 +-
 .../kafka/streams/kstream/SlidingWindows.java   |  67 ------
 .../kafka/streams/kstream/TumblingWindows.java  |  68 ++++++
 .../kafka/streams/kstream/UnlimitedWindows.java |   8 +-
 .../apache/kafka/streams/kstream/Window.java    |  19 ++
 .../apache/kafka/streams/kstream/Windowed.java  |   5 +
 .../apache/kafka/streams/kstream/Windows.java   |  27 ++-
 .../kstream/internals/KStreamAggWindow.java     |  51 ++++
 .../kstream/internals/KStreamAggregate.java     | 171 +++++++++++++
 .../kstream/internals/KStreamFlatMap.java       |  14 +-
 .../kstream/internals/KStreamFlatMapValues.java |  16 +-
 .../streams/kstream/internals/KStreamImpl.java  |  49 ++--
 .../kstream/internals/KStreamJoinWindow.java    |  11 +-
 .../kstream/internals/KStreamKStreamJoin.java   |  15 +-
 .../internals/KStreamKTableLeftJoin.java        |   6 +-
 .../streams/kstream/internals/KStreamMap.java   |  14 +-
 .../kstream/internals/KStreamMapValues.java     |  14 +-
 .../kstream/internals/KStreamTransform.java     |   8 +-
 .../internals/KTableKTableAbstractJoin.java     |   6 +-
 .../kstream/internals/KTableKTableJoin.java     |  20 +-
 .../kstream/internals/KTableKTableLeftJoin.java |  18 +-
 .../internals/KTableKTableOuterJoin.java        |  20 +-
 .../internals/KTableKTableRightJoin.java        |  18 +-
 .../kstream/internals/KTableMapValues.java      |  38 +--
 .../kstream/internals/KTableRepartitionMap.java |  38 +--
 .../kstream/internals/SlidingWindow.java        |  38 ---
 .../kstream/internals/TumblingWindow.java       |  38 +++
 .../kafka/streams/state/MeteredWindowStore.java |  18 +-
 .../kafka/streams/state/RocksDBStore.java       |   1 +
 .../kafka/streams/state/RocksDBWindowStore.java |  44 ++--
 .../state/RocksDBWindowStoreSupplier.java       |  10 +-
 .../apache/kafka/streams/state/WindowStore.java |   6 +-
 .../streams/state/WindowStoreIterator.java      |   4 +-
 .../kstream/internals/KStreamAggregateTest.java | 154 ++++++++++++
 .../internals/KStreamKStreamJoinTest.java       |   6 +-
 .../kstream/internals/KTableAggregateTest.java  |  18 +-
 .../streams/kstream/internals/WindowsTest.java  |  70 ++++++
 .../streams/state/RocksDBWindowStoreTest.java   | 239 +++++++++----------
 41 files changed, 1000 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
index d7141eb..f354ef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.HoppingWindow;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 public class HoppingWindows extends Windows<HoppingWindow> {
 
@@ -62,9 +62,22 @@ public class HoppingWindows extends Windows<HoppingWindow> {
     }
 
     @Override
-    public Collection<HoppingWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<HoppingWindow>emptyList();
+    public Map<Long, HoppingWindow> windowsFor(long timestamp) {
+        long enclosed = (size - 1) / period;
+
+        long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period);
+
+        Map<Long, HoppingWindow> windows = new HashMap<>();
+        while (windowStart <= timestamp) {
+            // add the window
+            HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size);
+            windows.put(windowStart, window);
+
+            // advance the step period
+            windowStart += this.period;
+        }
+
+        return windows;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 50aff9d..ffc1c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -18,31 +18,27 @@
 package org.apache.kafka.streams.kstream;
 
 
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
 
-import java.util.Collection;
+import java.util.Map;
 
 /**
  * This class is used to specify the behaviour of windowed joins.
  */
-public class JoinWindows extends Windows<SlidingWindow> {
-
-    private static final int DEFAULT_NUM_SEGMENTS = 3;
+public class JoinWindows extends Windows<TumblingWindow> {
 
     public final long before;
     public final long after;
-    public final int segments;
 
-    private JoinWindows(String name, long before, long after, int segments) {
+    private JoinWindows(String name, long before, long after) {
         super(name);
 
         this.after = after;
         this.before = before;
-        this.segments = segments;
     }
 
     public static JoinWindows of(String name) {
-        return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
+        return new JoinWindows(name, 0L, 0L);
     }
 
     /**
@@ -53,7 +49,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows within(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
+        return new JoinWindows(this.name, timeDifference, timeDifference);
     }
 
     /**
@@ -65,7 +61,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows before(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, this.after, this.segments);
+        return new JoinWindows(this.name, timeDifference, this.after);
     }
 
     /**
@@ -77,22 +73,11 @@ public class JoinWindows extends Windows<SlidingWindow> {
      * @return
      */
     public JoinWindows after(long timeDifference) {
-        return new JoinWindows(this.name, this.before, timeDifference, this.segments);
-    }
-
-    /**
-     * Specifies the number of segments to be used for rolling the window store,
-     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
-     *
-     * @param segments
-     * @return
-     */
-    protected JoinWindows segments(int segments) {
-        return new JoinWindows(name, before, after, segments);
+        return new JoinWindows(this.name, this.before, timeDifference);
     }
 
     @Override
-    public Collection<SlidingWindow> windowsFor(long timestamp) {
+    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
         // this function should never be called
         throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dace7e0..85d51e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -185,11 +185,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> join(
+    <V1, R> KStream<K, R> join(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
@@ -217,11 +217,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> outerJoin(
+    <V1, R> KStream<K, R> outerJoin(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V> thisValueSerializer,
@@ -245,11 +245,11 @@ public interface KStream<K, V> {
      * @param otherValueDeserializer value deserializer for other stream,
      *                      if not specified the default serializer defined in the configs will be used
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      */
-    <V1, V2> KStream<K, V2> leftJoin(
+    <V1, R> KStream<K, R> leftJoin(
             KStream<K, V1> otherStream,
-            ValueJoiner<V, V1, V2> joiner,
+            ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serializer<K> keySerializer,
             Serializer<V1> otherValueSerializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 9837dae..93eceec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -111,10 +111,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Combines values of this KTable with another KTable using Outer Join.
@@ -122,10 +122,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Combines values of this KTable with another KTable using Left Join.
@@ -133,10 +133,10 @@ public interface KTable<K, V> {
      * @param other the instance of KTable joined with this stream
      * @param joiner ValueJoiner
      * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
+     * @param <R>   the value type of the new stream
      * @return the instance of KTable
      */
-    <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+    <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
 
     /**
      * Aggregate values of this table by the selected key.
@@ -148,14 +148,14 @@ public interface KTable<K, V> {
      * @param <V1>   the value type of the aggregated table
      * @return the instance of KTable
      */
-    <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+    <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
                                           KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
                                           Serializer<K1> keySerializer,
                                           Serializer<V1> valueSerializer,
-                                          Serializer<V2> aggValueSerializer,
+                                          Serializer<T> aggValueSerializer,
                                           Deserializer<K1> keyDeserializer,
                                           Deserializer<V1> valueDeserializer,
-                                          Deserializer<V2> aggValueDeserializer,
+                                          Deserializer<T> aggValueDeserializer,
                                           String name);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
deleted file mode 100644
index ffdb4ad..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-public class SlidingWindows extends Windows<SlidingWindow> {
-
-    private static final long DEFAULT_SIZE_MS = 1000L;
-
-    public final long size;
-
-    private SlidingWindows(String name, long size) {
-        super(name);
-
-        this.size = size;
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the default window size
-     */
-    public static SlidingWindows of(String name) {
-        return new SlidingWindows(name, DEFAULT_SIZE_MS);
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the window size in milliseconds
-     */
-    public SlidingWindows with(long size) {
-        return new SlidingWindows(this.name, size);
-    }
-
-    @Override
-    public Collection<SlidingWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<SlidingWindow>emptyList();
-    }
-
-    @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(SlidingWindows.class))
-            return false;
-
-        SlidingWindows otherWindows = (SlidingWindows) other;
-
-        return this.size == otherWindows.size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
new file mode 100644
index 0000000..02ece3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class TumblingWindows extends Windows<TumblingWindow> {
+
+    private static final long DEFAULT_SIZE_MS = 1000L;
+
+    public final long size;
+
+    private TumblingWindows(String name, long size) {
+        super(name);
+
+        this.size = size;
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the default window size
+     */
+    public static TumblingWindows of(String name) {
+        return new TumblingWindows(name, DEFAULT_SIZE_MS);
+    }
+
+    /**
+     * Returns a half-interval sliding window definition with the window size in milliseconds
+     */
+    public TumblingWindows with(long size) {
+        return new TumblingWindows(this.name, size);
+    }
+
+    @Override
+    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
+        long windowStart = timestamp - timestamp % size;
+
+        return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
+    }
+
+    @Override
+    public boolean equalTo(Windows other) {
+        if (!other.getClass().equals(TumblingWindows.class))
+            return false;
+
+        TumblingWindows otherWindows = (TumblingWindows) other;
+
+        return this.size == otherWindows.size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 89cb0a8..6f47253 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 
-import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 public class UnlimitedWindows extends Windows<UnlimitedWindow> {
 
@@ -46,9 +46,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     }
 
     @Override
-    public Collection<UnlimitedWindow> windowsFor(long timestamp) {
-        // TODO
-        return Collections.<UnlimitedWindow>emptyList();
+    public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
+        // always return the single unlimited window
+        return Collections.singletonMap(start, new UnlimitedWindow(start));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 63e0a35..b9401b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -48,4 +48,23 @@ public abstract class Window {
     public boolean equalsTo(Window other) {
         return this.start() == other.start() && this.end() == other.end();
     }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this)
+            return true;
+
+        if (!(obj instanceof Window))
+            return false;
+
+        Window other = (Window) obj;
+
+        return this.equalsTo(other) && this.start == other.start && this.end == other.end;
+    }
+
+    @Override
+    public int hashCode() {
+        long n = (this.start << 32) | this.end;
+        return (int) (n % 0xFFFFFFFFL);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 03fb656..10afc73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -35,4 +35,9 @@ public class Windowed<T> {
     public Window window() {
         return window;
     }
+
+    @Override
+    public String toString() {
+        return "[" + value + "@" + window.start() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index ab8d822..e4d7d9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,25 +17,31 @@
 
 package org.apache.kafka.streams.kstream;
 
-import java.util.Collection;
+
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class Windows<W extends Window> {
 
+    private static final int DEFAULT_NUM_SEGMENTS = 3;
+
     private static final long DEFAULT_EMIT_DURATION = 1000L;
 
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
     private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
 
+    protected String name;
+
     private long emitDuration;
 
     private long maintainDuration;
 
-    protected String name;
+    public int segments;
 
     protected Windows(String name) {
         this.name = name;
+        this.segments = DEFAULT_NUM_SEGMENTS;
         this.emitDuration = DEFAULT_EMIT_DURATION;
         this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
     }
@@ -62,6 +68,19 @@ public abstract class Windows<W extends Window> {
         return this;
     }
 
+    /**
+     * Specifies the number of segments to be used for rolling the window store,
+     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
+     *
+     * @param segments
+     * @return
+     */
+    protected Windows segments(int segments) {
+        this.segments = segments;
+
+        return this;
+    }
+
     public long emitEveryMs() {
         return this.emitDuration;
     }
@@ -74,7 +93,7 @@ public abstract class Windows<W extends Window> {
         return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
     }
 
-    abstract boolean equalTo(Windows other);
+    public abstract boolean equalTo(Windows other);
 
-    abstract Collection<W> windowsFor(long timestamp);
+    public abstract Map<Long, W> windowsFor(long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
new file mode 100644
index 0000000..f02f53a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> {
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamAggWindowProcessor();
+    }
+
+    private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+        }
+
+        @Override
+        public void process(K key, V value) {
+            // create a dummy window just for wrapping the timestamp
+            long timestamp = context().timestamp();
+
+            // send the new aggregate value
+            context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
new file mode 100644
index 0000000..5745a03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> {
+
+    private final String storeName;
+    private final Windows<W> windows;
+    private final Aggregator<K, V, T> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<Windowed<K>, Change<V>> get() {
+        return new KStreamAggregateProcessor();
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public void process(Windowed<K> windowedKey, Change<V> change) {
+            // first get the matching windows
+            long timestamp = windowedKey.window().start();
+            K key = windowedKey.value();
+            V value = change.newValue;
+
+            Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+            long timeFrom = Long.MAX_VALUE;
+            long timeTo = Long.MIN_VALUE;
+
+            // use range query on window store for efficient reads
+            for (long windowStartMs : matchedWindows.keySet()) {
+                timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+                timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+            }
+
+            WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+            // for each matching window, try to update the corresponding key and send to the downstream
+            while (iter.hasNext()) {
+                KeyValue<Long, T> entry = iter.next();
+                W window = matchedWindows.get(entry.key);
+
+                if (window != null) {
+
+                    T oldAgg = entry.value;
+
+                    if (oldAgg == null)
+                        oldAgg = aggregator.initialValue();
+
+                    // try to add the new new value (there will never be old value)
+                    T newAgg = aggregator.add(key, value, oldAgg);
+
+                    // update the store with the new value
+                    windowStore.put(key, newAgg, window.start());
+
+                    // forward the aggregated change pair
+                    if (sendOldValues)
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                    else
+                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+                    matchedWindows.remove(entry.key);
+                }
+            }
+
+            iter.close();
+
+            // create the new window for the rest of unmatched window that do not exist yet
+            for (long windowStartMs : matchedWindows.keySet()) {
+                T oldAgg = aggregator.initialValue();
+                T newAgg = aggregator.add(key, value, oldAgg);
+
+                windowStore.put(key, newAgg, windowStartMs);
+
+                // send the new aggregate pair
+                if (sendOldValues)
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
+                else
+                    context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
+            }
+        }
+    }
+
+    @Override
+    public KTableValueGetterSupplier<Windowed<K>, T> view() {
+
+        return new KTableValueGetterSupplier<Windowed<K>, T>() {
+
+            public KTableValueGetter<Windowed<K>, T> get() {
+                return new KStreamAggregateValueGetter();
+            }
+
+        };
+    }
+
+    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+
+        private WindowStore<K, T> windowStore;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(ProcessorContext context) {
+            windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T get(Windowed<K> windowedKey) {
+            K key = windowedKey.value();
+            W window = (W) windowedKey.window();
+
+            // this iterator should only contain one element
+            Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
+
+            return iter.next().value;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 175a002..daef8b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
+    private final KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper;
 
-    KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
+    KStreamFlatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamFlatMapProcessor();
     }
 
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
+        public void process(K key, V value) {
+            for (KeyValue<K1, V1> newPair : mapper.apply(key, value)) {
                 context().forward(newPair.key, newPair.value);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index 9b4559b..97d6b7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -22,24 +22,24 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
+    private final ValueMapper<V, ? extends Iterable<V1>> mapper;
 
-    KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
+    KStreamFlatMapValues(ValueMapper<V, ? extends Iterable<V1>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamFlatMapValuesProcessor();
     }
 
-    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            Iterable<V2> newValues = mapper.apply(value);
-            for (V2 v : newValues) {
+        public void process(K key, V value) {
+            Iterable<V1> newValues = mapper.apply(value);
+            for (V1 v : newValues) {
                 context().forward(key, v);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2459f0d..7b634dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -305,28 +305,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         RocksDBWindowStoreSupplier<K, V> thisWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-this",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
                         null);
 
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-other",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
-        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+        KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
+        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
 
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer);
-        KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer);
         KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
 
         String thisWindowStreamName = topology.newName(WINDOWED_NAME);
@@ -362,15 +361,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         RocksDBWindowStoreSupplier<K, V1> otherWindow =
                 new RocksDBWindowStoreSupplier<>(
                         windows.name() + "-this",
-                        windows.before,
-                        windows.after,
                         windows.maintainMs(),
                         windows.segments,
+                        true,
                         new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
                         null);
 
-        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true);
+        KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+        KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
 
         String otherWindowStreamName = topology.newName(WINDOWED_NAME);
         String joinThisName = topology.newName(LEFTJOIN_NAME);
@@ -401,8 +399,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                        Serializer<T> aggValueSerializer,
                                                                        Deserializer<K> keyDeserializer,
                                                                        Deserializer<T> aggValueDeserializer) {
-        // TODO
-        return null;
+
+        // TODO: this agg window operator is only used for casting K to Windowed<K> for
+        // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+        String aggregateName = topology.newName(AGGREGATE_NAME);
+        String aggWindowName = topology.newName(WINDOWED_NAME);
+
+        ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+        ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+
+        RocksDBWindowStoreSupplier<K, T> aggregateStore =
+                new RocksDBWindowStoreSupplier<>(
+                        windows.name(),
+                        windows.maintainMs(),
+                        windows.segments,
+                        false,
+                        new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+                        null);
+
+        // aggregate the values with the aggregator and local store
+        topology.addProcessor(aggWindowName, aggWindowSupplier, this.name);
+        topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName);
+        topology.addStateStore(aggregateStore, aggregateName);
+
+        // return the KTable representation with the intermediate topic as the sources
+        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index b122aa1..4f427d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,9 +27,14 @@ import org.apache.kafka.streams.state.WindowStore;
 class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
 
     private final String windowName;
+    private final long windowSizeMs;
+    private final long retentionPeriodMs;
 
-    KStreamJoinWindow(String windowName) {
+
+    KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
         this.windowName = windowName;
+        this.windowSizeMs = windowSizeMs;
+        this.retentionPeriodMs = retentionPeriodMs;
     }
 
     @Override
@@ -46,6 +52,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
             super.init(context);
 
             window = (WindowStore<K, V>) context.getStateStore(windowName);
+
+            if (windowSizeMs * 2 > retentionPeriodMs)
+                throw new KafkaException("The retention period must be at least two times the join window size.");
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 8a9bf6c..01e3325 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -29,11 +30,16 @@ import java.util.Iterator;
 class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private final String otherWindowName;
+    private final long joinBeforeMs;
+    private final long joinAfterMs;
+
     private final ValueJoiner<V1, V2, R> joiner;
     private final boolean outer;
 
-    KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+    KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<V1, V2, R> joiner, boolean outer) {
         this.otherWindowName = otherWindowName;
+        this.joinBeforeMs = joinBeforeMs;
+        this.joinAfterMs = joinAfterMs;
         this.joiner = joiner;
         this.outer = outer;
     }
@@ -59,10 +65,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
         public void process(K key, V1 value) {
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 
-            Iterator<V2> iter = otherWindow.fetch(key, context().timestamp());
+            long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+            long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+
+            Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
             while (iter.hasNext()) {
                 needOuterJoin = false;
-                context().forward(key, joiner.apply(value, iter.next()));
+                context().forward(key, joiner.apply(value, iter.next().value));
             }
 
             if (needOuterJoin)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index 51a6277..dfca019 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
+class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
-    private final ValueJoiner<V1, V2, V> joiner;
+    private final ValueJoiner<V1, V2, R> joiner;
 
-    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) {
+    KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
         this.valueGetterSupplier = table.valueGetterSupplier();
         this.joiner = joiner;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 3868318..57f1431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
 
-    public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+    public KStreamMap(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            KeyValue<K2, V2> newPair = mapper.apply(key, value);
+        public void process(K key, V value) {
+            KeyValue<K1, V1> newPair = mapper.apply(key, value);
             context().forward(newPair.key, newPair.value);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 692b421..06667e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -22,23 +22,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
 
-    private final ValueMapper<V1, V2> mapper;
+    private final ValueMapper<V, V1> mapper;
 
-    public KStreamMapValues(ValueMapper<V1, V2> mapper) {
+    public KStreamMapValues(ValueMapper<V, V1> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamMapProcessor();
     }
 
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
         @Override
-        public void process(K1 key, V1 value) {
-            V2 newValue = mapper.apply(value);
+        public void process(K key, V value) {
+            V1 newValue = mapper.apply(value);
             context().forward(key, newValue);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 7ebab0e..a9d8f97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
 
-    private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
+    private final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier;
 
-    public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
+    public KStreamTransform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
         this.transformerSupplier = transformerSupplier;
     }
 
     @Override
-    public Processor<K1, V1> get() {
+    public Processor<K, V> get() {
         return new KStreamTransformProcessor(transformerSupplier.get());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index ad987dd..5e441aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -19,19 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
-abstract class KTableKTableAbstractJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> {
 
     protected final KTableImpl<K, ?, V1> table1;
     protected final KTableImpl<K, ?, V2> table2;
     protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
     protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    protected final ValueJoiner<V1, V2, V> joiner;
+    protected final ValueJoiner<V1, V2, R> joiner;
 
     protected boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
                              KTableImpl<K, ?, V2> table2,
-                             ValueJoiner<V1, V2, V> joiner) {
+                             ValueJoiner<V1, V2, R> joiner) {
         this.table1 = table1;
         this.table2 = table2;
         this.valueGetterSupplier1 = table1.valueGetterSupplier();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 9716edd..6eb27b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = null;
 
             if (change.newValue != null || change.oldValue != null)
@@ -78,7 +78,7 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
         }
     }
 
-    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -95,8 +95,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
         }
 
         @Override
-        public V get(K key) {
-            V newValue = null;
+        public R get(K key) {
+            R newValue = null;
             V1 value1 = valueGetter1.get(key);
 
             if (value1 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index b10bdb5..00e872e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = null;
 
             if (change.newValue != null || change.oldValue != null)
@@ -79,7 +79,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
     }
 
-    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -96,7 +96,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
+        public R get(K key) {
             V1 value1 = valueGetter1.get(key);
 
             if (value1 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index b859b34..6ab0ae9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
-    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -34,10 +34,10 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -61,8 +61,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = valueGetter.get(key);
 
             if (change.newValue != null || value2 != null)
@@ -77,7 +77,7 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
     }
 
-    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,8 +94,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
-            V newValue = null;
+        public R get(K key) {
+            R newValue = null;
             V1 value1 = valueGetter1.get(key);
             V2 value2 = valueGetter2.get(key);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f20e987..a6a13fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,10 +22,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
 
 
-    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+    KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -35,10 +35,10 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-        return new KTableValueGetterSupplier<K, V>() {
+    public KTableValueGetterSupplier<K, R> view() {
+        return new KTableValueGetterSupplier<K, R>() {
 
-            public KTableValueGetter<K, V> get() {
+            public KTableValueGetter<K, R> get() {
                 return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
             }
 
@@ -62,8 +62,8 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
         @Override
         public void process(K key, Change<V1> change) {
-            V newValue = null;
-            V oldValue = null;
+            R newValue = null;
+            R oldValue = null;
             V2 value2 = valueGetter.get(key);
 
             if (value2 != null) {
@@ -77,7 +77,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
 
     }
 
-    private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, V> {
+    private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> {
 
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
         }
 
         @Override
-        public V get(K key) {
+        public R get(K key) {
             V2 value2 = valueGetter2.get(key);
 
             if (value2 != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c664906..244d8ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -23,30 +23,30 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 
-class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
+class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
 
-    private final KTableImpl<K1, ?, V1> parent;
-    private final ValueMapper<V1, V2> mapper;
+    private final KTableImpl<K, ?, V> parent;
+    private final ValueMapper<V, V1> mapper;
 
     private boolean sendOldValues = false;
 
-    public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
+    public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<V, V1> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, Change<V1>> get() {
+    public Processor<K, Change<V>> get() {
         return new KTableMapValuesProcessor();
     }
 
     @Override
-    public KTableValueGetterSupplier<K1, V2> view() {
-        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+    public KTableValueGetterSupplier<K, V1> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
 
-        return new KTableValueGetterSupplier<K1, V2>() {
+        return new KTableValueGetterSupplier<K, V1>() {
 
-            public KTableValueGetter<K1, V2> get() {
+            public KTableValueGetter<K, V1> get() {
                 return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
             }
 
@@ -59,8 +59,8 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         sendOldValues = true;
     }
 
-    private V2 computeValue(V1 value) {
-        V2 newValue = null;
+    private V1 computeValue(V value) {
+        V1 newValue = null;
 
         if (value != null)
             newValue = mapper.apply(value);
@@ -68,22 +68,22 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         return newValue;
     }
 
-    private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
+    private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
 
         @Override
-        public void process(K1 key, Change<V1> change) {
-            V2 newValue = computeValue(change.newValue);
-            V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+        public void process(K key, Change<V> change) {
+            V1 newValue = computeValue(change.newValue);
+            V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
     }
 
-    private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+    private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
 
-        private final KTableValueGetter<K1, V1> parentGetter;
+        private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+        public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
@@ -93,7 +93,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
         }
 
         @Override
-        public V2 get(K1 key) {
+        public V1 get(K key) {
             return computeValue(parentGetter.get(key));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bbef7fb..12fcc17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -29,28 +29,28 @@ import org.apache.kafka.streams.processor.ProcessorContext;
  *
  * Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
  */
-public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
+public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
 
-    private final KTableImpl<K1, ?, V1> parent;
-    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+    private final KTableImpl<K, ?, V> parent;
+    private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
 
-    public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K1, Change<V1>> get() {
+    public Processor<K, Change<V>> get() {
         return new KTableMapProcessor();
     }
 
     @Override
-    public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
-        final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+    public KTableValueGetterSupplier<K, KeyValue<K1, V1>> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
 
-        return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
+        return new KTableValueGetterSupplier<K, KeyValue<K1, V1>>() {
 
-            public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
+            public KTableValueGetter<K, KeyValue<K1, V1>> get() {
                 return new KTableMapValueGetter(parentValueGetterSupplier.get());
             }
 
@@ -63,8 +63,8 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         throw new KafkaException("KTableRepartitionMap should always require sending old values.");
     }
 
-    private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
-        KeyValue<K2, V2> newValue = null;
+    private KeyValue<K1, V1> computeValue(K key, V value) {
+        KeyValue<K1, V1> newValue = null;
 
         if (key != null || value != null)
             newValue = mapper.apply(key, value);
@@ -72,26 +72,26 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         return newValue;
     }
 
-    private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+    private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
 
         @Override
-        public void process(K1 key, Change<V1> change) {
-            KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
+        public void process(K key, Change<V> change) {
+            KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
 
             context().forward(newPair.key, new Change<>(newPair.value, null));
 
             if (change.oldValue != null) {
-                KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
+                KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue);
                 context().forward(oldPair.key, new Change<>(null, oldPair.value));
             }
         }
     }
 
-    private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
+    private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
 
-        private final KTableValueGetter<K1, V1> parentGetter;
+        private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+        public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
@@ -101,7 +101,7 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
         }
 
         @Override
-        public KeyValue<K2, V2> get(K1 key) {
+        public KeyValue<K1, V1> get(K key) {
             return computeValue(key, parentGetter.get(key));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
deleted file mode 100644
index a6b5149..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class SlidingWindow extends Window {
-
-    public SlidingWindow(long start, long end) {
-        super(start, end);
-    }
-
-    @Override
-    public boolean overlap(Window other) {
-        return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
-    }
-
-    @Override
-    public boolean equalsTo(Window other) {
-        return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
new file mode 100644
index 0000000..a02d4b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class TumblingWindow extends Window {
+
+    public TumblingWindow(long start, long end) {
+        super(start, end);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return super.overlap(other) && other.getClass().equals(TumblingWindow.class);
+    }
+
+    @Override
+    public boolean equalsTo(Window other) {
+        return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
index d4ed0e7..cfcfb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 
@@ -97,20 +98,25 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
-    public WindowStoreIterator<V> fetch(K key, long timestamp) {
-        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime);
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
     }
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value);
+        putAndReturnInternalKey(key, value, -1L);
     }
 
     @Override
-    public byte[] putAndReturnInternalKey(K key, V value) {
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
         long startNs = time.nanoseconds();
         try {
-            byte[] binKey = this.inner.putAndReturnInternalKey(key, value);
+            byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
 
             if (loggingEnabled) {
                 changeLogger.add(binKey);
@@ -174,7 +180,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         @Override
-        public E next() {
+        public KeyValue<Long, E> next() {
             return iter.next();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index a32faf4..62b9f2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -222,6 +222,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         @Override
         public void close() {
+            iter.dispose();
         }
 
     }


Mime
View raw message