kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-5192: add WindowStore range scan (KIP-155)
Date Fri, 19 May 2017 00:03:13 GMT
KAFKA-5192: add WindowStore range scan (KIP-155)

Implements range scan for keys in windowed and session stores

Modifies caching session and windowed stores to use segmented cache keys.
Cache keys are internally prefixed with their segment id to ensure key ordering in the cache matches the ordering in the underlying store for keys spread across multiple segments.
This should also result in fewer cache keys getting scanned for queries spanning only some segments.

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #3027 from xvrl/windowstore-range-scan

(cherry picked from commit e28752357705568219315375c666f8e500db9c12)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9170e87b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9170e87b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9170e87b

Branch: refs/heads/0.11.0
Commit: 9170e87b2a154e7478af0ede43cc56b01f75bedd
Parents: b661d3b
Author: Xavier Léauté <xavier@confluent.io>
Authored: Thu May 18 17:02:51 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 18 17:03:08 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   7 +-
 .../org/apache/kafka/common/utils/Bytes.java    |   6 +-
 .../apache/kafka/streams/kstream/Windowed.java  |   2 +-
 .../streams/state/ReadOnlySessionStore.java     |  12 +
 .../streams/state/ReadOnlyWindowStore.java      |  14 +
 .../kafka/streams/state/SessionStore.java       |  15 +
 .../AbstractMergedSortedCacheStoreIterator.java |  18 +-
 .../streams/state/internals/CacheFunction.java  |  25 ++
 .../state/internals/CachingSessionStore.java    |  56 +++-
 .../state/internals/CachingWindowStore.java     |  64 +++-
 .../ChangeLoggingSegmentedBytesStore.java       |   6 +-
 .../CompositeReadOnlySessionStore.java          |  46 ++-
 .../internals/CompositeReadOnlyWindowStore.java |  49 ++-
 .../state/internals/FilteredCacheIterator.java  |  41 ++-
 .../state/internals/KeyValueIterators.java      |  71 ++++
 .../MergedSortedCacheKeyValueStoreIterator.java |  12 +-
 .../MergedSortedCacheSessionStoreIterator.java  |  39 ++-
 .../MergedSortedCacheWindowStoreIterator.java   |  22 +-
 ...dSortedCacheWindowStoreKeyValueIterator.java |  75 +++++
 .../internals/MeteredSegmentedBytesStore.java   |   7 +-
 .../streams/state/internals/OrderedBytes.java   |  68 ++++
 .../internals/RocksDBSegmentedBytesStore.java   |  18 +-
 .../state/internals/RocksDBSessionStore.java    |  15 +-
 .../internals/RocksDBSessionStoreSupplier.java  |   5 +-
 .../state/internals/RocksDBWindowStore.java     |  48 ++-
 .../internals/RocksDBWindowStoreSupplier.java   |   8 +-
 .../state/internals/SegmentedBytesStore.java    |  46 ++-
 .../state/internals/SegmentedCacheFunction.java |  76 +++++
 .../kafka/streams/state/internals/Segments.java |   7 +-
 .../state/internals/SessionKeySchema.java       |  33 +-
 .../state/internals/WindowKeySchema.java        |  31 +-
 .../internals/WindowStoreIteratorWrapper.java   | 195 +++++++++++
 .../state/internals/WindowStoreUtils.java       |  14 +-
 .../internals/WrappedWindowStoreIterator.java   |  91 -----
 .../internals/KStreamWindowAggregateTest.java   |  94 +++---
 .../kafka/streams/state/NoOpWindowStore.java    |   6 +
 .../internals/CachingSessionStoreTest.java      |  61 +++-
 .../state/internals/CachingWindowStoreTest.java |  72 +++-
 .../CompositeReadOnlyWindowStoreTest.java       |   9 +-
 .../internals/FilteredCacheIteratorTest.java    |  20 +-
 ...tedCacheWrappedSessionStoreIteratorTest.java |  25 +-
 ...rtedCacheWrappedWindowStoreIteratorTest.java |  32 +-
 ...eWrappedWindowStoreKeyValueIteratorTest.java | 132 ++++++++
 .../internals/ReadOnlyWindowStoreStub.java      |  62 +++-
 .../internals/RocksDBSessionStoreTest.java      |  21 +-
 .../state/internals/RocksDBWindowStoreTest.java | 331 ++++++++++++-------
 .../internals/SegmentedCacheFunctionTest.java   | 124 +++++++
 .../state/internals/SessionKeySchemaTest.java   |  92 +++++-
 .../state/internals/WindowKeySchemaTest.java    | 131 ++++++++
 .../kafka/test/ReadOnlySessionStoreStub.java    |  45 ++-
 .../kafka/test/SegmentedBytesStoreStub.java     |   5 +
 51 files changed, 2044 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index ed846cd..9f9e9ae 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -57,10 +57,13 @@
     <module name="LocalFinalVariableName"/>
     <module name="MemberName"/>
     <module name="ClassTypeParameterName">
-      <property name="format" value="^[A-Z0-9]*$"/>
+      <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/>
     </module>
     <module name="MethodTypeParameterName">
-      <property name="format" value="^[A-Z0-9]*$"/>
+      <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/>
+    </module>
+    <module name="InterfaceTypeParameterName">
+      <property name="format" value="^[A-Z][a-zA-Z0-9]*$$"/>
     </module>
     <module name="PackageName"/>
     <module name="ParameterName"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index cc794c5..3044020 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -50,7 +50,7 @@ public class Bytes implements Comparable<Bytes> {
 
     /**
      * Get the data from the Bytes.
-     * @return The data is only valid between offset and offset+length.
+     * @return The underlying byte array
      */
     public byte[] get() {
         return this.bytes;
@@ -139,9 +139,9 @@ public class Bytes implements Comparable<Bytes> {
     /**
      * A byte array comparator based on lexicograpic ordering.
      */
-    public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+    public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
 
-    private interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
+    public interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
 
         int compare(final byte[] buffer1, int offset1, int length1,
                     final byte[] buffer2, int offset2, int length2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index aa5157d..7234797 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -72,7 +72,7 @@ public class Windowed<K> {
 
     @Override
     public String toString() {
-        return "[" + key + "@" + window.start() + "]";
+        return "[" + key + "@" + window.start() + "/" + window.end() + "]";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
index 5bc8a42..7079769 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -41,4 +41,16 @@ public interface ReadOnlySessionStore<K, AGG> {
      * @return   KeyValueIterator containing all sessions for the provided key.
      */
     KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
+
+    /**
+     * Retrieve all aggregated sessions for the given range of keys
+     *
+     * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
+     * available session to the newest/latest session.
+     *
+     * @param    from first key in the range to find aggregated session values for
+     * @param    to last key in the range to find aggregated session values for
+     * @return   KeyValueIterator containing all sessions for the provided key.
+     */
+    KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 5546000..51864e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
 
 /**
  * A window store that only supports read operations
@@ -59,4 +60,17 @@ public interface ReadOnlyWindowStore<K, V> {
      * @throws InvalidStateStoreException if the store is not initialized
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
+
+    /**
+     * Get all the key-value pairs in the given key range and time range from all
+     * the existing windows.
+     *
+     * @param from      the first key in the range
+     * @param to        the last key in the range
+     * @param timeFrom  time range start (inclusive)
+     * @param timeTo    time range end (inclusive)
+     * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
+     */
+    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index 8ae5c06..a4cf12e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -29,10 +29,25 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
     /**
      * Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
      * start is &le; latestSessionStartTime
+     *
+     * @param key the key to return sessions for
+     * @param earliestSessionEndTime
+     * @return iterator of sessions with the matching key and aggregated values
      */
     KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
 
     /**
+     * Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
+     * start is &le; latestSessionStartTime
+     *
+     * @param keyFrom The first key that could be in the range
+     * @param keyTo The last key that could be in the range
+     * @param earliestSessionEndTime
+     * @return iterator of sessions with the matching keys and aggregated values
+     */
+    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);
+
+    /**
      * Remove the session aggregated with provided {@link Windowed} key from the store
      * @param sessionKey key of the session to remove
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index c5c1a2c..6eb9a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.NoSuchElementException;
 
@@ -29,27 +28,26 @@ import java.util.NoSuchElementException;
  * @param <K>
  * @param <V>
  */
-abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyValueIterator<K, V> {
+abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> {
     private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
-    private final KeyValueIterator<KS, byte[]> storeIterator;
-    protected final StateSerdes<K, V> serdes;
+    private final KeyValueIterator<KS, VS> storeIterator;
 
     AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
-                                           final KeyValueIterator<KS, byte[]> storeIterator,
-                                           final StateSerdes<K, V> serdes) {
+                                           final KeyValueIterator<KS, VS> storeIterator) {
         this.cacheIterator = cacheIterator;
         this.storeIterator = storeIterator;
-        this.serdes = serdes;
     }
 
     abstract int compare(final Bytes cacheKey, final KS storeKey);
 
     abstract K deserializeStoreKey(final KS key);
 
-    abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, byte[]> pair);
+    abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair);
 
     abstract K deserializeCacheKey(final Bytes cacheKey);
 
+    abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
+
     private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
         return nextFromCache.value.value == null;
     }
@@ -101,7 +99,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa
     }
 
     private KeyValue<K, V> nextStoreValue(KS nextStoreKey) {
-        final KeyValue<KS, byte[]> next = storeIterator.next();
+        final KeyValue<KS, VS> next = storeIterator.next();
 
         if (!next.key.equals(nextStoreKey)) {
             throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
@@ -117,7 +115,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa
             throw new IllegalStateException("Next record key is not the peeked key value; this should not happen");
         }
 
-        return KeyValue.pair(deserializeCacheKey(next.key), serdes.valueFrom(next.value.value));
+        return KeyValue.pair(deserializeCacheKey(next.key), deserializeCacheValue(next.value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java
new file mode 100644
index 0000000..66ef2d7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+interface CacheFunction {
+    Bytes key(Bytes cacheKey);
+    Bytes cacheKey(Bytes cacheKey);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 41e81eb..37d0c20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -40,6 +40,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     private final SessionKeySchema keySchema;
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
+    private final SegmentedCacheFunction cacheFunction;
     private String cacheName;
     private ThreadCache cache;
     private StateSerdes<K, AGG> serdes;
@@ -49,12 +50,14 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                         final Serde<K> keySerde,
-                        final Serde<AGG> aggSerde) {
+                        final Serde<AGG> aggSerde,
+                        final long segmentInterval) {
         super(bytesStore);
         this.bytesStore = bytesStore;
         this.keySerde = keySerde;
         this.aggSerde = aggSerde;
         this.keySchema = new SessionKeySchema();
+        this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval);
     }
 
     public void init(final ProcessorContext context, final StateStore root) {
@@ -91,15 +94,43 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
                                                            final long latestSessionStartTime) {
         validateStoreOpen();
         final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
-                                                                                  keySchema.lowerRange(binarySessionId, earliestSessionEndTime),
-                                                                                  keySchema.upperRange(binarySessionId, latestSessionStartTime));
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(binarySessionId, earliestSessionEndTime, latestSessionStartTime);
+
+        final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(binarySessionId, earliestSessionEndTime));
+        final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(binarySessionId, latestSessionStartTime));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(
+            binarySessionId, earliestSessionEndTime, latestSessionStartTime
+        );
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
+                                                                             binarySessionId,
                                                                              earliestSessionEndTime,
                                                                              latestSessionStartTime);
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
-        return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
+        return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> findSessions(K keyFrom,
+                                                           K keyTo,
+                                                           long earliestSessionEndTime,
+                                                           long latestSessionStartTime) {
+        validateStoreOpen();
+        final Bytes binarySessionIdFrom = Bytes.wrap(serdes.rawKey(keyFrom));
+        final Bytes binarySessionIdTo = Bytes.wrap(serdes.rawKey(keyTo));
+
+        final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(binarySessionIdFrom, earliestSessionEndTime));
+        final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(binarySessionIdTo, latestSessionStartTime));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(
+            binarySessionIdFrom, binarySessionIdTo, earliestSessionEndTime, latestSessionStartTime
+        );
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionIdFrom, binarySessionIdTo,
+                                                                             earliestSessionEndTime,
+                                                                             latestSessionStartTime);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
+        return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes, cacheFunction);
     }
 
     @Override
@@ -114,7 +145,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
-        cache.put(cacheName, binaryKey, entry);
+        cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
     }
 
     @Override
@@ -122,8 +153,15 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
         return findSessions(key, 0, Long.MAX_VALUE);
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) {
+        return findSessions(from, to, 0, Long.MAX_VALUE);
+    }
+
+
+
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
-        final Bytes binaryKey = entry.key();
+        final Bytes binaryKey = cacheFunction.key(entry.key());
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index f492573..9a4a97c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -19,14 +19,14 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordContext;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -35,27 +35,32 @@ import java.util.List;
 
 class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
 
+
     private final WindowStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final long windowSize;
     private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
 
+
     private String name;
     private ThreadCache cache;
     private InternalProcessorContext context;
     private StateSerdes<K, V> serdes;
     private CacheFlushListener<Windowed<K>, V> flushListener;
+    private final SegmentedCacheFunction cacheFunction;
 
     CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde,
-                       final long windowSize) {
+                       final long windowSize,
+                       final long segmentInterval) {
         super(underlying);
         this.underlying = underlying;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.windowSize = windowSize;
+        this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval);
     }
 
     @SuppressWarnings("unchecked")
@@ -80,11 +85,11 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
                 for (ThreadCache.DirtyEntry entry : entries) {
-                    final byte[] binaryWindowKey = entry.key().get();
+                    final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
                     final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
 
                     final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes),
-                            new TimeWindow(timestamp, timestamp + windowSize));
+                            WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
                     final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
                     maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
                     underlying.put(key, entry.newValue(), timestamp);
@@ -140,7 +145,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       timestamp, context.partition(), context.topic());
-        cache.put(name, keyBytes, entry);
+        cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
     }
 
     @Override
@@ -149,23 +154,55 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         // if store is open outside as well.
         validateStoreOpen();
 
-        Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
-        Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
-
         final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
         final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes);
+
+        final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(keyBytes, timeFrom));
+        final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(keyBytes, timeTo));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
 
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
+                                                                             keyBytes,
                                                                              timeFrom,
                                                                              timeTo);
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(
+            cacheIterator, hasNextCondition, cacheFunction
+        );
 
         return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                           underlyingIterator,
                                                           new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+        // since this function may not access the underlying inner store, we need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final Bytes keyFromBytes = Bytes.wrap(serdes.rawKey(from));
+        final Bytes keyToBytes = Bytes.wrap(serdes.rawKey(to));
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(keyFromBytes, keyToBytes, timeFrom, timeTo);
+
+        final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFromBytes, timeFrom));
+        final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyToBytes, timeTo));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
+
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFromBytes,
+                                                                             keyToBytes,
+                                                                             timeFrom,
+                                                                             timeTo);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
+
+        return new MergedSortedCacheWindowStoreKeyValueIterator<>(
+            filteredCacheIterator,
+            underlyingIterator,
+            serdes,
+            windowSize,
+            cacheFunction
+        );
+    }
+
     private V fetchPrevious(final Bytes key, final long timestamp) {
         try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, timestamp, timestamp)) {
             if (!iter.hasNext()) {
@@ -175,4 +212,5 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index d23e115..9a826c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -37,12 +37,16 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
         return bytesStore.fetch(key, from, to);
     }
 
     @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
+        return bytesStore.fetch(keyFrom, keyTo, from, to);
+    }
+
+    @Override
     public void remove(final Bytes key) {
         bytesStore.remove(key);
         changeLogger.logChange(key, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
index 5f4fc64..d63ab4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -24,7 +23,6 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 
 import java.util.List;
-import java.util.NoSuchElementException;
 
 /**
  * Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link
@@ -43,13 +41,15 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
         this.storeName = storeName;
     }
 
+    private interface Fetcher<K, V> {
+        KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store);
+    }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+    private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher) {
         final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
-                final KeyValueIterator<Windowed<K>, V> result = store.fetch(key);
+                final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store);
                 if (!result.hasNext()) {
                     result.close();
                 } else {
@@ -57,33 +57,31 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
                 }
             } catch (final InvalidStateStoreException ise) {
                 throw new InvalidStateStoreException("State store  [" + storeName + "] is not available anymore" +
-                                                             " and may have been migrated to another instance; " +
-                                                             "please re-discover its location from the state metadata.");
+                                                     " and may have been migrated to another instance; " +
+                                                     "please re-discover its location from the state metadata.");
             }
         }
-        return new KeyValueIterator<Windowed<K>, V>() {
-            @Override
-            public void close() {
-            }
-
-            @Override
-            public Windowed<K> peekNextKey() {
-                throw new NoSuchElementException();
-            }
+        return KeyValueIterators.emptyIterator();
+    }
 
-            @Override
-            public boolean hasNext() {
-                return false;
-            }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+        return fetch(new Fetcher<K, V>() {
             @Override
-            public KeyValue<Windowed<K>, V> next() {
-                throw new NoSuchElementException();
+            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) {
+                return store.fetch(key);
             }
+        });
+    }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
+        return fetch(new Fetcher<K, V>() {
             @Override
-            public void remove() {
+            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V> store) {
+                return store.fetch(from, to);
             }
-        };
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 22ec3c6..fbfb5aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -16,14 +16,14 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.List;
-import java.util.NoSuchElementException;
 
 /**
  * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
@@ -43,45 +43,58 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
         this.storeName = storeName;
     }
 
-    @Override
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+    private interface Fetcher<K, V, IteratorType extends KeyValueIterator<?, V>> {
+        IteratorType fetch(ReadOnlyWindowStore<K, V> store);
+        IteratorType empty();
+    }
+
+    public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K, V, IteratorType> fetcher) {
         final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName, windowStoreType);
         for (ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
-                final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
+                final IteratorType result = fetcher.fetch(windowStore);
                 if (!result.hasNext()) {
                     result.close();
                 } else {
                     return result;
                 }
             } catch (InvalidStateStoreException e) {
-                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
+                throw new InvalidStateStoreException(
+                    "State store is not available anymore and may have been migrated to another instance; " +
+                    "please re-discover its location from the state metadata.");
             }
         }
 
-        return new WindowStoreIterator<V>() {
-            @Override
-            public void close() {
-            }
+        return fetcher.empty();
+    }
 
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        return fetch(new Fetcher<K, V, WindowStoreIterator<V>>() {
             @Override
-            public Long peekNextKey() {
-                throw new NoSuchElementException();
+            public WindowStoreIterator<V> fetch(ReadOnlyWindowStore<K, V> store) {
+                return store.fetch(key, timeFrom, timeTo);
             }
 
             @Override
-            public boolean hasNext() {
-                return false;
+            public WindowStoreIterator<V> empty() {
+                return KeyValueIterators.emptyWindowStoreIterator();
             }
+        });
+    }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+        return fetch(new Fetcher<K, V, KeyValueIterator<Windowed<K>, V>>() {
             @Override
-            public KeyValue<Long, V> next() {
-                throw new NoSuchElementException();
+            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlyWindowStore<K, V> store) {
+                return store.fetch(from, to, timeFrom, timeTo);
             }
 
             @Override
-            public void remove() {
+            public KeyValueIterator<Windowed<K>, V> empty() {
+                return KeyValueIterators.emptyIterator();
             }
-        };
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
index 19370b9..4486fda 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
@@ -24,11 +24,48 @@ import java.util.NoSuchElementException;
 class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
     private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
     private final HasNextCondition hasNextCondition;
+    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> wrappedIterator;
 
     FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
-                          final HasNextCondition hasNextCondition) {
+                          final HasNextCondition hasNextCondition,
+                          final CacheFunction cacheFunction) {
         this.cacheIterator = cacheIterator;
         this.hasNextCondition = hasNextCondition;
+        this.wrappedIterator = new PeekingKeyValueIterator<Bytes, LRUCacheEntry>() {
+            @Override
+            public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+                return cachedPair(cacheIterator.peekNext());
+            }
+
+            @Override
+            public void close() {
+                cacheIterator.close();
+            }
+
+            @Override
+            public Bytes peekNextKey() {
+                return cacheFunction.key(cacheIterator.peekNextKey());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return cacheIterator.hasNext();
+            }
+
+            @Override
+            public KeyValue<Bytes, LRUCacheEntry> next() {
+                return cachedPair(cacheIterator.next());
+            }
+
+            private KeyValue<Bytes, LRUCacheEntry> cachedPair(KeyValue<Bytes, LRUCacheEntry> next) {
+                return KeyValue.pair(cacheFunction.key(next.key), next.value);
+            }
+
+            @Override
+            public void remove() {
+                cacheIterator.remove();
+            }
+        };
     }
 
     @Override
@@ -46,7 +83,7 @@ class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEn
 
     @Override
     public boolean hasNext() {
-        return hasNextCondition.hasNext(cacheIterator);
+        return hasNextCondition.hasNext(wrappedIterator);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java
new file mode 100644
index 0000000..bef6f49
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIterators.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+class KeyValueIterators {
+
+    private static class EmptyKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public K peekNextKey() {
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return false;
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public void remove() {
+        }
+    }
+
+    private static class EmptyWindowStoreIterator<V> extends EmptyKeyValueIterator<Long, V>
+        implements WindowStoreIterator<V> {
+    }
+
+    private static final KeyValueIterator EMPTY_ITERATOR = new EmptyKeyValueIterator();
+    private static final WindowStoreIterator EMPTY_WINDOW_STORE_ITERATOR = new EmptyWindowStoreIterator();
+
+
+    @SuppressWarnings("unchecked")
+    static <K, V> KeyValueIterator<K, V> emptyIterator() {
+        return (KeyValueIterator<K, V>) EMPTY_ITERATOR;
+    }
+
+    @SuppressWarnings("unchecked")
+    static <V> WindowStoreIterator<V> emptyWindowStoreIterator() {
+        return (WindowStoreIterator<V>) EMPTY_WINDOW_STORE_ITERATOR;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index 07d5b6e..f7bb2ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -27,12 +27,15 @@ import org.apache.kafka.streams.state.StateSerdes;
  * @param <K>
  * @param <V>
  */
-class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V> {
+class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V, byte[]> {
+
+    private final StateSerdes<K, V> serdes;
 
     MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                            final KeyValueIterator<Bytes, byte[]> storeIterator,
                                            final StateSerdes<K, V> serdes) {
-        super(cacheIterator, storeIterator, serdes);
+        super(cacheIterator, storeIterator);
+        this.serdes = serdes;
     }
 
     @Override
@@ -46,6 +49,11 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedC
     }
 
     @Override
+    V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
+        return serdes.valueFrom(cacheEntry.value);
+    }
+
+    @Override
     public K deserializeStoreKey(final Bytes key) {
         return serdes.keyFrom(key.get());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index 3f9b620..67118f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -29,41 +29,46 @@ import org.apache.kafka.streams.state.StateSerdes;
  * @param <K>
  * @param <AGG>
  */
-class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG> {
-    private final StateSerdes<K, AGG> rawSerdes;
+class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, AGG, byte[]> {
 
+    private final StateSerdes<K, AGG> serdes;
+    private final SegmentedCacheFunction cacheFunction;
 
     MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                           final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
-                                          final StateSerdes<K, AGG> serdes) {
-        super(cacheIterator, storeIterator, new StateSerdes<>(serdes.topic(),
-                                                              new SessionKeySerde<>(serdes.keySerde()),
-                                                              serdes.valueSerde()));
-
-        rawSerdes = serdes;
+                                          final StateSerdes<K, AGG> serdes,
+                                          final SegmentedCacheFunction cacheFunction) {
+        super(cacheIterator, storeIterator);
+        this.serdes = serdes;
+        this.cacheFunction = cacheFunction;
     }
 
     @Override
-    public KeyValue<Windowed<K>, AGG> deserializeStorePair(KeyValue<Windowed<Bytes>, byte[]> pair) {
-        final K key = rawSerdes.keyFrom(pair.key.key().get());
-        return KeyValue.pair(new Windowed<>(key, pair.key.window()), serdes.valueFrom(pair.value));
+    public KeyValue<Windowed<K>, AGG> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) {
+        return KeyValue.pair(deserializeStoreKey(pair.key), serdes.valueFrom(pair.value));
     }
 
     @Override
     Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
-        return SessionKeySerde.from(cacheKey.get(), rawSerdes.keyDeserializer(), rawSerdes.topic());
+        byte[] binaryKey = cacheFunction.key(cacheKey).get();
+        return SessionKeySerde.from(binaryKey, serdes.keyDeserializer(), serdes.topic());
+    }
+
+
+    @Override
+    AGG deserializeCacheValue(final LRUCacheEntry cacheEntry) {
+        return serdes.valueFrom(cacheEntry.value);
     }
 
     @Override
-    public Windowed<K> deserializeStoreKey(Windowed<Bytes> key) {
-        final K originalKey = rawSerdes.keyFrom(key.key().get());
+    public Windowed<K> deserializeStoreKey(final Windowed<Bytes> key) {
+        final K originalKey = serdes.keyFrom(key.key().get());
         return new Windowed<>(originalKey, key.window());
     }
 
     @Override
-    public int compare(Bytes cacheKey, Windowed<Bytes> storeKey) {
+    public int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
         Bytes storeKeyBytes = SessionKeySerde.bytesToBinary(storeKey);
-        return cacheKey.compareTo(storeKeyBytes);
+        return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
     }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
index 742bcbc..657b601 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -22,17 +22,22 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import static org.apache.kafka.streams.state.internals.SegmentedCacheFunction.bytesFromCacheKey;
+
 /**
  * Merges two iterators. Assumes each of them is sorted by key
  *
  * @param <V>
  */
-class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V> implements WindowStoreIterator<V> {
+class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheStoreIterator<Long, Long, V, byte[]> implements WindowStoreIterator<V> {
+
+    private final StateSerdes<Long, V> serdes;
 
     MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                          final KeyValueIterator<Long, byte[]> storeIterator,
                                          final StateSerdes<Long, V> serdes) {
-        super(cacheIterator, storeIterator, serdes);
+        super(cacheIterator, storeIterator);
+        this.serdes = serdes;
     }
 
     @Override
@@ -42,7 +47,14 @@ class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheS
 
     @Override
     Long deserializeCacheKey(final Bytes cacheKey) {
-        return WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+        byte[] binaryKey = bytesFromCacheKey(cacheKey);
+
+        return WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+    }
+
+    @Override
+    V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
+        return serdes.valueFrom(cacheEntry.value);
     }
 
     @Override
@@ -52,7 +64,9 @@ class MergedSortedCacheWindowStoreIterator<V> extends AbstractMergedSortedCacheS
 
     @Override
     public int compare(final Bytes cacheKey, final Long storeKey) {
-        final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheKey.get());
+        byte[] binaryKey = bytesFromCacheKey(cacheKey);
+
+        final Long cacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
         return cacheTimestamp.compareTo(storeKey);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
new file mode 100644
index 0000000..92ad021
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+class MergedSortedCacheWindowStoreKeyValueIterator<K, V>
+    extends AbstractMergedSortedCacheStoreIterator<Windowed<K>, Windowed<Bytes>, V, byte[]> {
+
+    private final StateSerdes<K, V> serdes;
+    private final long windowSize;
+    private final SegmentedCacheFunction cacheFunction;
+
+    MergedSortedCacheWindowStoreKeyValueIterator(
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator,
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator,
+        final StateSerdes<K, V> serdes,
+        final long windowSize,
+        final SegmentedCacheFunction cacheFunction
+    ) {
+        super(filteredCacheIterator, underlyingIterator);
+        this.serdes = serdes;
+        this.windowSize = windowSize;
+        this.cacheFunction = cacheFunction;
+    }
+
+    @Override
+    Windowed<K> deserializeStoreKey(final Windowed<Bytes> key) {
+        return new Windowed<>(serdes.keyFrom(key.key().get()), key.window());
+    }
+
+    @Override
+    KeyValue<Windowed<K>, V> deserializeStorePair(final KeyValue<Windowed<Bytes>, byte[]> pair) {
+        return KeyValue.pair(deserializeStoreKey(pair.key), serdes.valueFrom(pair.value));
+    }
+
+    @Override
+    Windowed<K> deserializeCacheKey(final Bytes cacheKey) {
+        byte[] binaryKey = cacheFunction.key(cacheKey).get();
+
+        final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
+        final K key = WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes);
+        return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
+    }
+
+    @Override
+    V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
+        return serdes.valueFrom(cacheEntry.value);
+    }
+
+    @Override
+    int compare(final Bytes cacheKey, final Windowed<Bytes> storeKey) {
+        Bytes storeKeyBytes = WindowStoreUtils.toBinaryKey(storeKey.key().get(), storeKey.window().start(), 0);
+        return cacheFunction.compareSegmentedKeys(cacheKey, storeKeyBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
index afc18e4..664873a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -79,11 +79,16 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, long timeFrom, long timeTo) {
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
         return new MeteredSegmentedBytesStoreIterator(inner.fetch(key, timeFrom, timeTo), this.fetchTime);
     }
 
     @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
+        return new MeteredSegmentedBytesStoreIterator(inner.fetch(keyFrom, keyTo, from, to), this.fetchTime);
+    }
+
+    @Override
     public void remove(final Bytes key) {
         final long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
new file mode 100644
index 0000000..ace2487
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.nio.ByteBuffer;
+
+class OrderedBytes {
+
+    private static final int MIN_KEY_LENGTH = 1;
+    /**
+     * Returns the upper byte range for a key with a given fixed size maximum suffix
+     *
+     * Assumes the minimum key length is one byte
+     */
+    static Bytes upperRange(Bytes key, byte[] maxSuffix) {
+        final byte[] bytes = key.get();
+        ByteBuffer rangeEnd = ByteBuffer.allocate(bytes.length + maxSuffix.length);
+
+        int i = 0;
+        while (i < bytes.length && (
+            i < MIN_KEY_LENGTH // assumes keys are at least one byte long
+            || (bytes[i] & 0xFF) >= (maxSuffix[0] & 0xFF)
+            )) {
+            rangeEnd.put(bytes[i++]);
+        }
+
+        rangeEnd.put(maxSuffix);
+        rangeEnd.flip();
+
+        byte[] res = new byte[rangeEnd.remaining()];
+        ByteBuffer.wrap(res).put(rangeEnd);
+        return Bytes.wrap(res);
+    }
+
+    static Bytes lowerRange(Bytes key, byte[] minSuffix) {
+        final byte[] bytes = key.get();
+        ByteBuffer rangeStart = ByteBuffer.allocate(bytes.length + minSuffix.length);
+        // any key in the range would start at least with the given prefix to be
+        // in the range, and have at least SUFFIX_SIZE number of trailing zero bytes.
+
+        // unless there is a maximum key length, you can keep appending more zero bytes
+        // to keyFrom to create a key that will match the range, yet that would precede
+        // WindowStoreUtils.toBinaryKey(keyFrom, from, 0) in byte order
+        return Bytes.wrap(
+            rangeStart
+                .put(bytes)
+                .put(minSuffix)
+                .array()
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 252a55f..f3c4639 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -46,11 +46,23 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
         final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
 
-        final Bytes binaryFrom = keySchema.lowerRange(key, from);
-        final Bytes binaryTo = keySchema.upperRange(key, to);
+        final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+        final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
 
         return new SegmentIterator(searchSpace.iterator(),
-                                   keySchema.hasNextCondition(key, from, to),
+                                   keySchema.hasNextCondition(key, key, from, to),
+                                   binaryFrom, binaryTo);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, Bytes keyTo, final long from, final long to) {
+        final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
+
+        final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
+        final Bytes binaryTo = keySchema.upperRange(keyTo, to);
+
+        return new SegmentIterator(searchSpace.iterator(),
+                                   keySchema.hasNextCondition(keyFrom, keyTo, from, to),
                                    binaryFrom, binaryTo);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 103bb55..9fde74b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -90,7 +90,15 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     @Override
     public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
+        return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
+            Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)),
+            earliestSessionEndTime, latestSessionStartTime
+        );
         return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
     }
 
@@ -100,6 +108,11 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
     }
 
     @Override
+    public KeyValueIterator<Windowed<K>, AGG> fetch(K from, K to) {
+        return findSessions(from, to, 0, Long.MAX_VALUE);
+    }
+
+    @Override
     public void remove(final Windowed<K> key) {
         bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer(), topic));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index 4e618d9..b407a22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -50,6 +50,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
 
     public SessionStore<K, V> get() {
         final SessionKeySchema keySchema = new SessionKeySchema();
+        final long segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
                                                                                     retentionPeriod,
                                                                                     NUM_SEGMENTS,
@@ -62,7 +63,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
             final RocksDBSessionStore<Bytes, byte[]> sessionStore
                     = RocksDBSessionStore.bytesStore(metered);
 
-            return new CachingSessionStore<>(sessionStore, keySerde, valueSerde);
+            return new CachingSessionStore<>(sessionStore, keySerde, valueSerde, segmentInterval);
         }
 
         if (cached) {
@@ -71,7 +72,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
             final RocksDBSessionStore<Bytes, byte[]> sessionStore
                     = RocksDBSessionStore.bytesStore(metered);
 
-            return new CachingSessionStore<>(sessionStore, keySerde, valueSerde);
+            return new CachingSessionStore<>(sessionStore, keySerde, valueSerde, segmentInterval);
         }
 
         if (logged) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 5e8d0b2..b147894 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -29,19 +30,10 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 
 class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
 
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    protected final SegmentedBytesStore bytesStore;
-    protected final boolean retainDuplicates;
-
-    private ProcessorContext context;
-    protected StateSerdes<K, V> serdes;
-    protected int seqnum = 0;
-
     // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
     private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {
-        RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) {
-            super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates);
+        RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) {
+            super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates, windowSize);
         }
 
         @Override
@@ -54,23 +46,41 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
         @Override
         public WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
             final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, timeFrom, timeTo);
-            return WrappedWindowStoreIterator.bytesIterator(bytesIterator, serdes);
+            return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).valuesIterator();
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to, long timeFrom, long timeTo) {
+            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(from, to, timeFrom, timeTo);
+            return WindowStoreIteratorWrapper.bytesIterator(bytesIterator, serdes, windowSize).keyValueIterator();
         }
     }
 
-    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) {
-        return new RocksDBWindowBytesStore(inner, retainDuplicates);
+    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates, final long windowSize) {
+        return new RocksDBWindowBytesStore(inner, retainDuplicates, windowSize);
     }
 
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final boolean retainDuplicates;
+    protected final long windowSize;
+    protected final SegmentedBytesStore bytesStore;
+
+    private ProcessorContext context;
+    protected StateSerdes<K, V> serdes;
+    protected int seqnum = 0;
+
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde,
-                       final boolean retainDuplicates) {
+                       final boolean retainDuplicates,
+                       final long windowSize) {
         super(bytesStore);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.bytesStore = bytesStore;
         this.retainDuplicates = retainDuplicates;
+        this.windowSize = windowSize;
     }
 
     @Override
@@ -100,7 +110,13 @@ class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
     @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
-        return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
+        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator();
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
+        return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
     }
 
     void maybeUpdateSeqnumForDups() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 1b270a2..b1e0b02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -37,6 +37,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
+    private final long segmentInterval;
     private final long windowSize;
     private final boolean enableCaching;
 
@@ -51,6 +52,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
         this.numSegments = numSegments;
         this.windowSize = windowSize;
         this.enableCaching = enableCaching;
+        this.segmentInterval = Segments.segmentInterval(retentionPeriod, numSegments);
     }
 
     public String name() {
@@ -84,9 +86,9 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
     private WindowStore<K, V> maybeWrapCaching(final SegmentedBytesStore inner) {
         final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(inner, "rocksdb-window", time);
         if (!enableCaching) {
-            return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates);
+            return new RocksDBWindowStore<>(metered, keySerde, valueSerde, retainDuplicates, windowSize);
         }
-        final RocksDBWindowStore<Bytes, byte[]> windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates);
-        return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize);
+        final RocksDBWindowStore<Bytes, byte[]> windowed = RocksDBWindowStore.bytesStore(metered, retainDuplicates, windowSize);
+        return new CachingWindowStore<>(windowed, keySerde, valueSerde, windowSize, segmentInterval);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 0c3bb53..72ae6e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -37,7 +37,18 @@ public interface SegmentedBytesStore extends StateStore {
      * @param to        latest time to match
      * @return  an iterator over key-value pairs
      */
-    KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to);
+    KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to);
+
+    /**
+     * Fetch all records from the segmented store in the provided key range and time range
+     * from all existing segments
+     * @param keyFrom   The first key that could be in the range
+     * @param keyTo     The last key that could be in the range
+     * @param from      earliest time to match
+     * @param to        latest time to match
+     * @return  an iterator over key-value pairs
+     */
+    KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to);
 
     /**
      * Remove the record with the provided key. The key
@@ -75,7 +86,7 @@ public interface SegmentedBytesStore extends StateStore {
         void init(final String topic);
 
         /**
-         * Given a record-key and a time, construct a Segmented key that represents
+         * Given a range of record keys and a time, construct a Segmented key that represents
          * the upper range of keys to search when performing range queries.
          * @see SessionKeySchema#upperRange
          * @see WindowKeySchema#upperRange
@@ -86,7 +97,7 @@ public interface SegmentedBytesStore extends StateStore {
         Bytes upperRange(final Bytes key, final long to);
 
         /**
-         * Given a record-key and a time, construct a Segmented key that represents
+         * Given a range of record keys and a time, construct a Segmented key that represents
          * the lower range of keys to search when performing range queries.
          * @see SessionKeySchema#lowerRange
          * @see WindowKeySchema#lowerRange
@@ -97,6 +108,28 @@ public interface SegmentedBytesStore extends StateStore {
         Bytes lowerRange(final Bytes key, final long from);
 
         /**
+         * Given a range of fixed size record keys and a time, construct a Segmented key that represents
+         * the upper range of keys to search when performing range queries.
+         * @see SessionKeySchema#upperRange
+         * @see WindowKeySchema#upperRange
+         * @param key the last key in the range
+         * @param to the last timestamp in the range
+         * @return The key that represents the upper range to search for in the store
+         */
+        Bytes upperRangeFixedSize(final Bytes key, final long to);
+
+        /**
+         * Given a range of fixed size record keys and a time, construct a Segmented key that represents
+         * the lower range of keys to search when performing range queries.
+         * @see SessionKeySchema#lowerRange
+         * @see WindowKeySchema#lowerRange
+         * @param key the first key in the range
+         * @param from the first timestamp in the range
+         * @return      The key that represents the lower range to search for in the store
+         */
+        Bytes lowerRangeFixedSize(final Bytes key, final long from);
+
+        /**
          * Extract the timestamp of the segment from the key. The key is a composite of
          * the record-key, any timestamps, plus any additional information.
          * @see SessionKeySchema#lowerRange
@@ -108,13 +141,14 @@ public interface SegmentedBytesStore extends StateStore {
 
         /**
          * Create an implementation of {@link HasNextCondition} that knows when
-         * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations
-         * @param binaryKey     the record-key
+         * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations
+         * @param binaryKeyFrom the first key in the range
+         * @param binaryKeyTo   the last key in the range
          * @param from          starting time range
          * @param to            ending time range
          * @return
          */
-        HasNextCondition hasNextCondition(final Bytes binaryKey, long from, long to);
+        HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to);
 
         /**
          * Used during {@link SegmentedBytesStore#fetch(Bytes, long, long)} operations to determine

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
new file mode 100644
index 0000000..8571f92
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
+
+import java.nio.ByteBuffer;
+
+class SegmentedCacheFunction implements CacheFunction {
+
+    private static final int SEGMENT_ID_BYTES = 8;
+
+    private final KeySchema keySchema;
+    private final long segmentInterval;
+
+    SegmentedCacheFunction(KeySchema keySchema, long segmentInterval) {
+        this.keySchema = keySchema;
+        this.segmentInterval = segmentInterval;
+    }
+
+    @Override
+    public Bytes key(Bytes cacheKey) {
+        return Bytes.wrap(bytesFromCacheKey(cacheKey));
+    }
+
+    @Override
+    public Bytes cacheKey(Bytes key) {
+        final byte[] keyBytes = key.get();
+        ByteBuffer buf = ByteBuffer.allocate(SEGMENT_ID_BYTES + keyBytes.length);
+        buf.putLong(segmentId(key)).put(keyBytes);
+        return Bytes.wrap(buf.array());
+    }
+
+    static byte[] bytesFromCacheKey(Bytes cacheKey) {
+        byte[] binaryKey = new byte[cacheKey.get().length - SEGMENT_ID_BYTES];
+        System.arraycopy(cacheKey.get(), SEGMENT_ID_BYTES, binaryKey, 0, binaryKey.length);
+        return binaryKey;
+    }
+
+    public long segmentId(Bytes key) {
+        return keySchema.segmentTimestamp(key) / segmentInterval;
+    }
+
+    int compareSegmentedKeys(Bytes cacheKey, Bytes storeKey) {
+        long storeSegmentId = segmentId(storeKey);
+        long cacheSegmentId = ByteBuffer.wrap(cacheKey.get()).getLong();
+
+        final int segmentCompare = Long.compare(cacheSegmentId, storeSegmentId);
+        if (segmentCompare == 0) {
+            byte[] cacheKeyBytes = cacheKey.get();
+            byte[] storeKeyBytes = storeKey.get();
+            return Bytes.BYTES_LEXICO_COMPARATOR.compare(
+                cacheKeyBytes, SEGMENT_ID_BYTES, cacheKeyBytes.length - SEGMENT_ID_BYTES,
+                storeKeyBytes, 0, storeKeyBytes.length
+            );
+        } else {
+            return segmentCompare;
+        }
+    }
+}


Mime
View raw message