kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/5] kafka git commit: KAFKA-3452: Support session windows
Date Fri, 06 Jan 2017 18:12:37 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 922f4bc..f2c3b53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,19 +106,28 @@ public class Stores {
                                     private int numSegments = 0;
                                     private long retentionPeriod = 0L;
                                     private boolean retainDuplicates = false;
+                                    private boolean sessionWindows;
                                     private boolean logged = true;
 
                                     @Override
-                                    public PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates) {
+                                    public PersistentKeyValueFactory<K, V> windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) {
                                         this.windowSize = windowSize;
                                         this.numSegments = numSegments;
                                         this.retentionPeriod = retentionPeriod;
                                         this.retainDuplicates = retainDuplicates;
+                                        this.sessionWindows = false;
 
                                         return this;
                                     }
 
                                     @Override
+                                    public PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod) {
+                                        this.sessionWindows = true;
+                                        this.retentionPeriod = retentionPeriod;
+                                        return this;
+                                    }
+
+                                    @Override
                                     public PersistentKeyValueFactory<K, V> enableLogging(final Map<String, String> config) {
                                         logged = true;
                                         logConfig.putAll(config);
@@ -140,7 +150,9 @@ public class Stores {
                                     @Override
                                     public StateStoreSupplier build() {
                                         log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
-                                        if (numSegments > 0) {
+                                        if (sessionWindows) {
+                                            return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
+                                        } else if (numSegments > 0) {
                                             return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled);
                                         }
                                         return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde, logged, logConfig, cachingEnabled);
@@ -381,7 +393,6 @@ public class Stores {
 
         /**
          * Set the persistent store as a windowed key-value store
-         *
          * @param windowSize size of the windows
          * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
          * @param numSegments the maximum number of segments for rolling the windowed store
@@ -390,6 +401,12 @@ public class Stores {
         PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);
 
         /**
+         * Set the persistent store as a {@link SessionStore} for use with {@link org.apache.kafka.streams.kstream.SessionWindows}
+         * @param retentionPeriod period of time in milliseconds to keep each window in this store
+         */
+        PersistentKeyValueFactory<K, V> sessionWindowed(final long retentionPeriod);
+
+        /**
          * Indicates that a changelog should be created for the store. The changelog will be created
          * with the provided cleanupPolicy and configs.
          *

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index cfe6bd3..6da40b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -169,7 +169,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
         validateStoreOpen();
         final byte[] origFrom = serdes.rawKey(from);
         final byte[] origTo = serdes.rawKey(to);
-        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.range(Bytes.wrap(origFrom), Bytes.wrap(origTo)));
+        final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(Bytes.wrap(origFrom), Bytes.wrap(origTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, origFrom, origTo);
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
     }
@@ -177,7 +177,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
     @Override
     public KeyValueIterator<K, V> all() {
         validateStoreOpen();
-        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.all());
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(name, underlying.all());
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
     }
@@ -223,7 +223,8 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor
         validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
         final V v = get(rawKey);
-        put(rawKey, null);
+        cache.delete(name, serdes.rawKey(key));
+        underlying.delete(Bytes.wrap(rawKey));
         return v;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
new file mode 100644
index 0000000..a012c63
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+class CachingSessionStore<K, AGG>  implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
+
+    private final SegmentedBytesStore bytesStore;
+    private final SessionKeySchema keySchema;
+    private Serde<K> keySerde;
+    private final Serde<AGG> aggSerde;
+    private InternalProcessorContext context;
+    private String name;
+    private StateSerdes<Windowed<K>, AGG> serdes;
+    private ThreadCache cache;
+    private CacheFlushListener<Windowed<K>, AGG> flushListener;
+
+    CachingSessionStore(final SegmentedBytesStore bytesStore,
+                        final Serde<K> keySerde,
+                        final Serde<AGG> aggSerde) {
+        this.bytesStore = bytesStore;
+        this.keySerde = keySerde;
+        this.aggSerde = aggSerde;
+        this.keySchema = new SessionKeySchema();
+    }
+
+    public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key,
+                                                                  final long earliestSessionEndTime,
+                                                                  final long latestSessionStartTime) {
+        validateStoreOpen();
+        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(name, key));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name,
+                                                                                  keySchema.lowerRange(binarySessionId,
+                                                                                                       earliestSessionEndTime).get(),
+                                                                                  keySchema.upperRange(binarySessionId, latestSessionStartTime).get());
+        final KeyValueIterator<Bytes, byte[]> storeIterator = bytesStore.fetch(binarySessionId, earliestSessionEndTime, latestSessionStartTime);
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
+                                                                                    earliestSessionEndTime,
+                                                                                    latestSessionStartTime);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition);
+        return new MergedSortedCacheKeyValueStoreIterator<>(filteredCacheIterator, storeIterator, serdes);
+    }
+
+
+    public void remove(final Windowed<K> sessionKey) {
+        validateStoreOpen();
+        put(sessionKey, null);
+    }
+
+    public void put(final Windowed<K> key, AGG value) {
+        validateStoreOpen();
+        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer());
+        final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
+                                                      key.window().end(), context.partition(), context.topic());
+        cache.put(name, binaryKey.get(), entry);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+        return findSessionsToMerge(key, 0, Long.MAX_VALUE);
+    }
+
+
+    public String name() {
+        return bytesStore.name();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init(final ProcessorContext context, final StateStore root) {
+        bytesStore.init(context, root);
+        initInternal((InternalProcessorContext) context);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void initInternal(final InternalProcessorContext context) {
+        this.context = context;
+
+        if (keySerde == null) {
+            keySerde = (Serde<K>) context.keySerde();
+        }
+
+
+        this.serdes = (StateSerdes<Windowed<K>, AGG>) new StateSerdes<>(bytesStore.name(),
+                                                                              new SessionKeySerde<>(keySerde),
+                                                                              aggSerde == null ? context.valueSerde() : aggSerde);
+
+
+        this.name = context.taskId() + "-" + bytesStore.name();
+        this.cache = this.context.getCache();
+        cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> entries) {
+                for (ThreadCache.DirtyEntry entry : entries) {
+                    putAndMaybeForward(entry, context);
+                }
+            }
+        });
+
+    }
+
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
+        final Bytes binaryKey = entry.key();
+        final RecordContext current = context.recordContext();
+        context.setRecordContext(entry.recordContext());
+        try {
+            if (flushListener != null) {
+                final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer());
+                final AGG newValue = serdes.valueFrom(entry.newValue());
+                final AGG oldValue = fetchPrevious(binaryKey);
+                if (!(newValue == null && oldValue == null)) {
+                    flushListener.apply(key, newValue == null ? null : newValue, oldValue);
+                }
+
+            }
+            bytesStore.put(binaryKey, entry.newValue());
+        } finally {
+            context.setRecordContext(current);
+        }
+    }
+
+    private AGG fetchPrevious(final Bytes key) {
+        final byte[] bytes = bytesStore.get(key);
+        if (bytes == null) {
+            return null;
+        }
+        return serdes.valueFrom(bytes);
+    }
+
+
+    public void flush() {
+        cache.flush(name);
+        bytesStore.flush();
+    }
+
+    public void close() {
+        flush();
+        bytesStore.close();
+        cache.close(name);
+    }
+
+    public boolean persistent() {
+        return bytesStore.persistent();
+    }
+
+    public boolean isOpen() {
+        return bytesStore.isOpen();
+    }
+
+    public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) {
+        this.flushListener = flushListener;
+    }
+
+    private void validateStoreOpen() {
+        if (!isOpen()) {
+            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
+        }
+    }
+
+
+    private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+        private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
+        private final HasNextCondition hasNextCondition;
+
+        FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
+                              final HasNextCondition hasNextCondition) {
+            this.cacheIterator = cacheIterator;
+            this.hasNextCondition = hasNextCondition;
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return cacheIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return hasNextCondition.hasNext(cacheIterator);
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return cacheIterator.next();
+
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return cacheIterator.peekNext();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 71856fa..bd252f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -34,7 +35,7 @@ import java.util.List;
 
 class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
 
-    private final WindowStore<Bytes, byte[]> underlying;
+    private final SegmentedBytesStore underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private CacheFlushListener<Windowed<K>, V> flushListener;
@@ -44,7 +45,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi
     private InternalProcessorContext context;
     private StateSerdes<K, V> serdes;
 
-    CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
+    CachingWindowStore(final SegmentedBytesStore underlying,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde,
                        final long windowSize) {
@@ -84,8 +85,8 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi
                     final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
                     final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryKey, serdes),
                                                                    new TimeWindow(timestamp, timestamp + windowSize));
-                    maybeForward(entry, key, timestamp, windowedKey, (InternalProcessorContext) context);
-                    underlying.put(key, entry.newValue(), timestamp);
+                    maybeForward(entry, Bytes.wrap(binaryKey), windowedKey, (InternalProcessorContext) context);
+                    underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, 0, WindowStoreUtils.INNER_SERDES)), entry.newValue());
                 }
             }
         });
@@ -94,7 +95,6 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi
 
     private void maybeForward(final ThreadCache.DirtyEntry entry,
                               final Bytes key,
-                              final long timestamp,
                               final Windowed<K> windowedKey,
                               final InternalProcessorContext context) {
         if (flushListener != null) {
@@ -102,7 +102,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi
             context.setRecordContext(entry.recordContext());
             try {
                 flushListener.apply(windowedKey,
-                                    serdes.valueFrom(entry.newValue()), fetchPrevious(key, timestamp));
+                                    serdes.valueFrom(entry.newValue()), fetchPrevious(key));
             } finally {
                 context.setRecordContext(current);
             }
@@ -158,18 +158,18 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
 
-        final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
+        final KeyValueIterator<Bytes, byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, binaryFrom, binaryTo);
-        return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new DelegatingPeekingWindowIterator<>(underlyingIterator), serdes);
+        return new MergedSortedCachedWindowStoreIterator<>(cacheIterator, new DelegatingPeekingKeyValueIterator<>(name, underlyingIterator), serdes);
     }
 
-    private V fetchPrevious(final Bytes key, final long timestamp) {
-        try (final WindowStoreIterator<byte[]> iterator = underlying.fetch(key, timestamp, timestamp)) {
-            if (!iterator.hasNext()) {
-                return null;
-            }
-            return serdes.valueFrom(iterator.next().value);
+
+    private V fetchPrevious(final Bytes key) {
+        final byte[] result = underlying.get(key);
+        if (result == null) {
+            return null;
         }
+        return serdes.valueFrom(result);
     }
 
     private void validateStoreOpen() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
new file mode 100644
index 0000000..14b8f17
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * Simple wrapper around a {@link SegmentedBytesStore} to support writing
+ * updates to a changelog
+ */
+class ChangeLoggingSegmentedBytesStore implements SegmentedBytesStore {
+
+    private final SegmentedBytesStore bytesStore;
+    private StoreChangeLogger<Bytes, byte[]> changeLogger;
+
+
+    ChangeLoggingSegmentedBytesStore(final SegmentedBytesStore bytesStore) {
+        this.bytesStore = bytesStore;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
+        return bytesStore.fetch(key, from, to);
+    }
+
+    @Override
+    public void remove(final Bytes key) {
+        bytesStore.remove(key);
+        changeLogger.logChange(key, null);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        if (key != null) {
+            bytesStore.put(key, value);
+            changeLogger.logChange(key, value);
+        }
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return bytesStore.get(key);
+    }
+
+    @Override
+    public String name() {
+        return bytesStore.name();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(final ProcessorContext context, final StateStore root) {
+        bytesStore.init(context, root);
+        changeLogger = new StoreChangeLogger<>(name(), context, WindowStoreUtils.INNER_SERDES);
+    }
+
+    @Override
+    public void flush() {
+        bytesStore.flush();
+    }
+
+    @Override
+    public void close() {
+        bytesStore.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return bytesStore.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return bytesStore.isOpen();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 5c47419..b6b2031 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -75,7 +75,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
-        return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
+        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
     }
 
     @Override
@@ -91,7 +91,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
-        return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
+        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction));
     }
 
     @Override
@@ -132,6 +132,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
         }
 
         @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey not supported");
+        }
+
+        @Override
         public boolean hasNext() {
             while ((current == null || !current.hasNext())
                     && storeIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
new file mode 100644
index 0000000..ea10b1a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link
+ * org.apache.kafka.streams.processor.internals.ProcessorTopology}
+ */
+public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> {
+    private final StateStoreProvider storeProvider;
+    private final QueryableStoreType<ReadOnlySessionStore<K, V>> queryableStoreType;
+    private final String storeName;
+
+    public CompositeReadOnlySessionStore(final StateStoreProvider storeProvider,
+                                         final QueryableStoreType<ReadOnlySessionStore<K, V>> queryableStoreType,
+                                         final String storeName) {
+        this.storeProvider = storeProvider;
+        this.queryableStoreType = queryableStoreType;
+        this.storeName = storeName;
+    }
+
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+        final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
+        for (final ReadOnlySessionStore<K, V> store : stores) {
+            try {
+                final KeyValueIterator<Windowed<K>, V> result = store.fetch(key);
+                if (!result.hasNext()) {
+                    result.close();
+                } else {
+                    return result;
+                }
+            } catch (final InvalidStateStoreException ise) {
+                throw new InvalidStateStoreException("State store  [" + storeName + "] is not available anymore" +
+                                                             " and may have been migrated to another instance; " +
+                                                             "please re-discover its location from the state metadata.");
+            }
+        }
+        return new KeyValueIterator<Windowed<K>, V>() {
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public Windowed<K> peekNextKey() {
+                throw new NoSuchElementException();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return false;
+            }
+
+            @Override
+            public KeyValue<Windowed<K>, V> next() {
+                throw new NoSuchElementException();
+            }
+
+            @Override
+            public void remove() {
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index 38d5108..61cd950 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -17,20 +17,24 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.NoSuchElementException;
 
-class DelegatingPeekingKeyValueIterator<K, V> implements PeekingKeyValueIterator<K, V> {
+class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+    private final String storeName;
     private final KeyValueIterator<K, V> underlying;
     private KeyValue<K, V> next;
+    private volatile boolean open = true;
 
-    public DelegatingPeekingKeyValueIterator(final KeyValueIterator<K, V> underlying) {
+    DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) {
+        this.storeName = storeName;
         this.underlying = underlying;
     }
 
     @Override
-    public K peekNextKey() {
+    public synchronized K peekNextKey() {
         if (!hasNext()) {
             throw new NoSuchElementException();
         }
@@ -38,12 +42,16 @@ class DelegatingPeekingKeyValueIterator<K, V> implements PeekingKeyValueIterator
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         underlying.close();
+        open = false;
     }
 
     @Override
-    public boolean hasNext() {
+    public synchronized boolean hasNext() {
+        if (!open) {
+            throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
+        }
         if (next != null) {
             return true;
         }
@@ -57,7 +65,7 @@ class DelegatingPeekingKeyValueIterator<K, V> implements PeekingKeyValueIterator
     }
 
     @Override
-    public KeyValue<K, V> next() {
+    public synchronized KeyValue<K, V> next() {
         if (!hasNext()) {
             throw new NoSuchElementException();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
deleted file mode 100644
index 402a363..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIterator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.util.NoSuchElementException;
-
-class DelegatingPeekingWindowIterator<V> implements PeekingWindowIterator<V> {
-    private final WindowStoreIterator<V> underlying;
-    private KeyValue<Long, V> next;
-
-    public DelegatingPeekingWindowIterator(final WindowStoreIterator<V> underlying) {
-        this.underlying = underlying;
-    }
-
-    @Override
-    public KeyValue<Long, V> peekNext() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-        return next;
-    }
-
-    @Override
-    public void close() {
-        underlying.close();
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (next != null) {
-            return true;
-        }
-
-        if (!underlying.hasNext()) {
-            return false;
-        }
-
-        next = underlying.next();
-        return true;
-    }
-
-    @Override
-    public KeyValue<Long, V> next() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-        final KeyValue<Long, V> result = next;
-        next = null;
-        return result;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException("remove not supported");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
new file mode 100644
index 0000000..5171ebd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HasNextCondition.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+interface HasNextCondition {
+    boolean hasNext(final KeyValueIterator<Bytes, ?> iterator);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index d09630d..e00f8ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -155,13 +155,13 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
 
         @Override
         public synchronized KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator());
+            return new DelegatingPeekingKeyValueIterator<>(name, new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator()));
         }
 
         @Override
         public synchronized KeyValueIterator<K, V> all() {
             final TreeMap<K, V> copy = new TreeMap<>(this.map);
-            return new MemoryStoreIterator<>(copy.entrySet().iterator());
+            return new DelegatingPeekingKeyValueIterator<>(name, new MemoryStoreIterator<>(copy.entrySet().iterator()));
         }
 
         @Override
@@ -206,6 +206,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
             public void close() {
             }
 
+            @Override
+            public K peekNextKey() {
+                throw new UnsupportedOperationException("peekNextKey not supported on MemoryStoreIterator");
+            }
+
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index e77c642..b7e47e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -50,7 +50,7 @@ class LRUCacheEntry implements RecordContext {
                 8 + // timestamp
                 8 + // offset
                 4 + // partition
-                topic.length();
+                (topic == null ? 0 : topic.length());
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index b79ad7d..7666155 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -41,7 +41,7 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
         final TreeMap<K, V> treeMap = toTreeMap();
-        return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap);
+        return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap));
     }
 
     @Override
@@ -85,5 +85,10 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
         public void close() {
             // do nothing
         }
+
+        @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey not supported");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index cbcd0f4..c9a6866 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -31,13 +31,13 @@ import java.util.NoSuchElementException;
  * @param <V>
  */
 class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K, V> {
-    private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
-    private final PeekingKeyValueIterator<Bytes, byte[]> storeIterator;
+    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
+    private final KeyValueIterator<Bytes, byte[]> storeIterator;
     private final StateSerdes<K, V> serdes;
     private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
 
-    public MergedSortedCacheKeyValueStoreIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
-                                                  final PeekingKeyValueIterator<Bytes, byte[]> storeIterator,
+    public MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                                                  final KeyValueIterator<Bytes, byte[]> storeIterator,
                                                   final StateSerdes<K, V> serdes) {
         this.cacheIterator = cacheIterator;
         this.storeIterator = storeIterator;
@@ -48,9 +48,9 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K
     public boolean hasNext() {
         while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
             if (storeIterator.hasNext()) {
-                final byte[] storeKey = storeIterator.peekNextKey().get();
+                final Bytes storeKey = storeIterator.peekNextKey();
                 // advance the store iterator if the key is the same as the deleted cache key
-                if (comparator.compare(storeKey, cacheIterator.peekNext().key) == 0) {
+                if (storeKey.equals(cacheIterator.peekNextKey())) {
                     storeIterator.next();
                 }
             }
@@ -60,20 +60,74 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K
         return cacheIterator.hasNext() || storeIterator.hasNext();
     }
 
-    private boolean isDeletedCacheEntry(final KeyValue<byte[], LRUCacheEntry> nextFromCache) {
+
+    private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) {
         return  nextFromCache.value.value == null;
     }
 
 
     @Override
     public KeyValue<K, V> next() {
+
+        return internalNext(new NextValueFunction<KeyValue<K, V>>() {
+            @Override
+            public KeyValue<K, V> apply(final byte[] cacheKey, final byte[] storeKey) {
+                if (cacheKey == null) {
+                    return nextStoreValue();
+                }
+
+                if (storeKey == null) {
+                    return nextCacheValue();
+                }
+
+                final int comparison = comparator.compare(cacheKey, storeKey);
+                if (comparison > 0) {
+                    return nextStoreValue();
+                } else if (comparison < 0) {
+                    return nextCacheValue();
+                } else {
+                    storeIterator.next();
+                    return nextCacheValue();
+                }
+            }
+        });
+    }
+
+    @Override
+    public K peekNextKey() {
+        return internalNext(new NextValueFunction<K>() {
+            @Override
+            public K apply(final byte[] cacheKey, final byte[] storeKey) {
+                if (cacheKey == null) {
+                    return serdes.keyFrom(storeKey);
+                }
+
+                if (storeKey == null) {
+                    return serdes.keyFrom(cacheKey);
+                }
+
+                final int comparison = comparator.compare(cacheKey, storeKey);
+                if (comparison > 0) {
+                    return serdes.keyFrom(storeKey);
+                } else {
+                    return serdes.keyFrom(cacheKey);
+                }
+            }
+        });
+    }
+
+    interface NextValueFunction<T> {
+        T apply(final byte[] cacheKey, final byte [] storeKey);
+    }
+
+    private <T> T internalNext(final NextValueFunction<T> nextValueFunction) {
         if (!hasNext()) {
             throw new NoSuchElementException();
         }
 
         byte[] nextCacheKey = null;
         if (cacheIterator.hasNext()) {
-            nextCacheKey = cacheIterator.peekNextKey();
+            nextCacheKey = cacheIterator.peekNextKey().get();
         }
 
         byte[] nextStoreKey = null;
@@ -81,29 +135,12 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K
             nextStoreKey = storeIterator.peekNextKey().get();
         }
 
-        if (nextCacheKey == null) {
-            return nextStoreValue();
-        }
-
-        if (nextStoreKey == null) {
-            return nextCacheValue();
-        }
-
-        final int comparison = comparator.compare(nextCacheKey, nextStoreKey);
-        if (comparison > 0) {
-            return nextStoreValue();
-        } else if (comparison < 0) {
-            return nextCacheValue();
-        } else {
-            storeIterator.next();
-            return nextCacheValue();
-        }
-
+        return nextValueFunction.apply(nextCacheKey, nextStoreKey);
     }
 
     private KeyValue<K, V> nextCacheValue() {
-        final KeyValue<byte[], LRUCacheEntry> next = cacheIterator.next();
-        return KeyValue.pair(serdes.keyFrom(next.key), serdes.valueFrom(next.value.value));
+        final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
+        return KeyValue.pair(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value.value));
     }
 
     private KeyValue<K, V> nextStoreValue() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
index 68f147f..e210e73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCachedWindowStoreIterator.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -30,11 +32,11 @@ import java.util.NoSuchElementException;
  */
 class MergedSortedCachedWindowStoreIterator<K, V> implements WindowStoreIterator<V> {
     private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
-    private final PeekingWindowIterator<byte[]> storeIterator;
+    private final KeyValueIterator<Bytes, byte[]> storeIterator;
     private final StateSerdes<K, V> serdes;
 
     public MergedSortedCachedWindowStoreIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
-                                                 final PeekingWindowIterator<byte[]> storeIterator,
+                                                 final KeyValueIterator<Bytes, byte[]> storeIterator,
                                                  final StateSerdes<K, V> serdes) {
         this.cacheIterator = cacheIterator;
         this.storeIterator = storeIterator;
@@ -55,16 +57,16 @@ class MergedSortedCachedWindowStoreIterator<K, V> implements WindowStoreIterator
 
         Long nextCacheTimestamp = null;
         if (cacheIterator.hasNext()) {
-            nextCacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheIterator.peekNextKey());
+            nextCacheTimestamp = WindowStoreUtils.timestampFromBinaryKey(cacheIterator.peekNextKey().get());
         }
 
         Long nextStoreTimestamp = null;
         if (storeIterator.hasNext()) {
-            nextStoreTimestamp = storeIterator.peekNext().key;
+            nextStoreTimestamp = WindowStoreUtils.timestampFromBinaryKey(storeIterator.peekNextKey().get());
         }
 
         if (nextCacheTimestamp == null) {
-            return nextStoreValue();
+            return nextStoreValue(nextStoreTimestamp);
         }
 
         if (nextStoreTimestamp == null) {
@@ -73,7 +75,7 @@ class MergedSortedCachedWindowStoreIterator<K, V> implements WindowStoreIterator
 
         final int comparison = nextCacheTimestamp.compareTo(nextStoreTimestamp);
         if (comparison > 0) {
-            return nextStoreValue();
+            return nextStoreValue(nextStoreTimestamp);
         } else if (comparison < 0) {
             return nextCacheValue(nextCacheTimestamp);
         } else {
@@ -83,13 +85,13 @@ class MergedSortedCachedWindowStoreIterator<K, V> implements WindowStoreIterator
     }
 
     private KeyValue<Long, V> nextCacheValue(final Long timestamp) {
-        final KeyValue<byte[], LRUCacheEntry> next = cacheIterator.next();
+        final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
         return KeyValue.pair(timestamp, serdes.valueFrom(next.value.value));
     }
 
-    private KeyValue<Long, V> nextStoreValue() {
-        final KeyValue<Long, byte[]> next = storeIterator.next();
-        return KeyValue.pair(next.key, serdes.valueFrom(next.value));
+    private KeyValue<Long, V> nextStoreValue(final Long timestamp) {
+        final KeyValue<Bytes, byte[]> next = storeIterator.next();
+        return KeyValue.pair(timestamp, serdes.valueFrom(next.value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2e16af2..fb8b3b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -212,5 +212,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
                 metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
             }
         }
+
+        @Override
+        public K1 peekNextKey() {
+            return iter.peekNextKey();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
new file mode 100644
index 0000000..5181c08
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+class MeteredSegmentedBytesStore implements SegmentedBytesStore {
+
+    private final SegmentedBytesStore inner;
+    private final String metricScope;
+    private final Time time;
+
+    private Sensor putTime;
+    private Sensor fetchTime;
+    private Sensor flushTime;
+    private StreamsMetrics metrics;
+    private Sensor getTime;
+    private Sensor removeTime;
+
+    MeteredSegmentedBytesStore(final SegmentedBytesStore inner, String metricScope, Time time) {
+        this.inner = inner;
+        this.metricScope = metricScope;
+        this.time = time != null ? time : new SystemTime();
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public void init(ProcessorContext context, StateStore root) {
+        final String name = name();
+        this.metrics = context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+        this.removeTime = this.metrics.addLatencySensor(metricScope, name, "remove");
+
+        final Sensor restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+        // register and possibly restore the state from the logs
+        final long startNs = time.nanoseconds();
+        try {
+            inner.init(context, root);
+        } finally {
+            this.metrics.recordLatency(restoreTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final long startNs = time.nanoseconds();
+        try {
+            return inner.get(key);
+        } finally {
+            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, long timeFrom, long timeTo) {
+        return new MeteredSegmentedBytesStoreIterator(inner.fetch(key, timeFrom, timeTo), this.fetchTime);
+    }
+
+    @Override
+    public void remove(final Bytes key) {
+        final long startNs = time.nanoseconds();
+        try {
+            inner.remove(key);
+        } finally {
+            this.metrics.recordLatency(this.removeTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.put(key, value);
+        } finally {
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        final long startNs = time.nanoseconds();
+        try {
+            this.inner.flush();
+        } finally {
+            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private class MeteredSegmentedBytesStoreIterator implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        MeteredSegmentedBytesStoreIterator(final KeyValueIterator<Bytes, byte[]> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, byte[]> next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            return iter.peekNextKey();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 4272f2b..70f8676 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -110,6 +110,7 @@ class NamedCache {
         }
 
         final List<ThreadCache.DirtyEntry> entries  = new ArrayList<>();
+        final List<Bytes> deleted = new ArrayList<>();
 
         // evicted already been removed from the cache so add it to the list of
         // flushed entries and remove from dirtyKeys.
@@ -125,10 +126,16 @@ class NamedCache {
             }
             entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry));
             node.entry.markClean();
+            if (node.entry.value == null) {
+                deleted.add(node.key);
+            }
         }
         // clear dirtyKeys before the listener is applied as it may be re-entrant.
         dirtyKeys.clear();
         listener.apply(entries);
+        for (Bytes key : deleted) {
+            delete(key);
+        }
     }
 
 
@@ -264,7 +271,6 @@ class NamedCache {
         copy.addAll(keySet);
         return copy.iterator();
     }
-    
 
     synchronized Iterator<Bytes> allKeys() {
         return keySetIterator(cache.navigableKeySet());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
index 9a3a05c..496a424 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingKeyValueIterator.java
@@ -16,9 +16,10 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 public interface PeekingKeyValueIterator<K, V> extends KeyValueIterator<K, V> {
 
-    K peekNextKey();
+    KeyValue<K, V> peekNext();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
deleted file mode 100644
index c112169..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PeekingWindowIterator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-public interface PeekingWindowIterator<V> extends WindowStoreIterator<V> {
-
-    KeyValue<Long, V> peekNext();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
new file mode 100644
index 0000000..31956ba
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
+
+    private final String name;
+    private final Segments segments;
+    private final KeySchema keySchema;
+    private ProcessorContext context;
+    private volatile boolean open;
+
+
+    RocksDBSegmentedBytesStore(final String name,
+                               final long retention,
+                               final int numSegments,
+                               final KeySchema keySchema) {
+        this.name = name;
+        this.keySchema = keySchema;
+        this.segments = new Segments(name, retention, numSegments);
+    }
+
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
+        final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
+
+        final Bytes binaryFrom = keySchema.lowerRange(key, from);
+        final Bytes binaryTo = keySchema.upperRange(key, to);
+
+        return new SegmentIterator(
+                searchSpace.iterator(),
+                keySchema.hasNextCondition(key, from, to),
+                binaryFrom, binaryTo);
+    }
+
+    @Override
+    public void remove(final Bytes key) {
+        final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        if (segment == null) {
+            return;
+        }
+        segment.delete(key);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        final long segmentId = segments.segmentId(keySchema.segmentTimestamp(key));
+        final Segment segment = segments.getOrCreateSegment(segmentId, context);
+        if (segment != null) {
+            segment.put(key, value);
+        }
+    }
+
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        if (segment == null) {
+            return null;
+        }
+        return segment.get(key);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context, StateStore root) {
+        this.context = context;
+
+        segments.openExisting(context);
+
+        // register and possibly restore the state from the logs
+        context.register(root, false, new StateRestoreCallback() {
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                put(Bytes.wrap(key), value);
+            }
+        });
+
+        flush();
+        open = true;
+    }
+
+    @Override
+    public void flush() {
+        segments.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        segments.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
new file mode 100644
index 0000000..73c825c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+
+class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
+
+    private final Serde<K> keySerde;
+    private final Serde<AGG> aggSerde;
+    private final SegmentedBytesStore bytesStore;
+    private StateSerdes<K, AGG> serdes;
+
+
+    RocksDBSessionStore(final SegmentedBytesStore bytesStore,
+                        final Serde<K> keySerde,
+                        final Serde<AGG> aggSerde) {
+        this.keySerde = keySerde;
+        this.bytesStore = bytesStore;
+        this.aggSerde = aggSerde;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
+        return new SessionStoreIterator(bytesIterator, serdes);
+    }
+
+
+    @Override
+    public void remove(final Windowed<K> key) {
+        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer()));
+    }
+
+    @Override
+    public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
+    }
+
+    @Override
+    public String name() {
+        return bytesStore.name();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.serdes = new StateSerdes<>(bytesStore.name(),
+                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                        aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
+
+        bytesStore.init(context, root);
+    }
+
+    @Override
+    public void flush() {
+        bytesStore.flush();
+    }
+
+    @Override
+    public void close() {
+        bytesStore.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return bytesStore.isOpen();
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+        return findSessionsToMerge(key, 0, Long.MAX_VALUE);
+    }
+
+    private static class SessionStoreIterator<K, AGG> implements KeyValueIterator<Windowed<K>, AGG> {
+
+        private final KeyValueIterator<Bytes, byte[]> bytesIterator;
+        private final StateSerdes<K, AGG> serdes;
+
+        SessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<K, AGG> serdes) {
+            this.bytesIterator = bytesIterator;
+            this.serdes = serdes;
+        }
+
+        @Override
+        public void close() {
+            bytesIterator.close();
+        }
+
+        @Override
+        public Windowed<K> peekNextKey() {
+            final Bytes bytes = bytesIterator.peekNextKey();
+            return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer());
+        }
+
+        @Override
+        public boolean hasNext() {
+            return bytesIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, AGG> next() {
+            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+            return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value));
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove not supported by SessionStoreIterator");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
new file mode 100644
index 0000000..7645472
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.Map;
+
+/**
+ * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+
+public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
+
+    private static final int NUM_SEGMENTS = 3;
+    private final long retentionPeriod;
+    private final boolean enableCaching;
+
+    public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
+        super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
+        this.retentionPeriod = retentionPeriod;
+        this.enableCaching = enableCaching;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public SessionStore<K, V> get() {
+        final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(name,
+                                                                                     retentionPeriod,
+                                                                                     NUM_SEGMENTS,
+                                                                                     new SessionKeySchema());
+        final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(logged ? new ChangeLoggingSegmentedBytesStore(bytesStore)
+                                                                                          : bytesStore, "rocksdb-session-store", time);
+        if (enableCaching) {
+            return new CachingSessionStore<>(metered, keySerde, valueSerde);
+        }
+        return new RocksDBSessionStore<>(metered, keySerde, valueSerde);
+
+    }
+
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 9c6703a..f838191 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -97,7 +97,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
     protected volatile boolean open = false;
-    private ProcessorContext context;
 
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
@@ -130,6 +129,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         options.setCreateIfMissing(true);
         options.setErrorIfExists(false);
 
+
         wOptions = new WriteOptions();
         wOptions.setDisableWAL(true);
 
@@ -139,7 +139,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
-        this.context = context;
         final Map<String, Object> configs = context.appConfigs();
         final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
         if (configSetterClass != null) {
@@ -242,11 +241,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    synchronized void writeToStore(K key, V value) {
-        putInternal(serdes.rawKey(key), serdes.rawValue(value));
-    }
-
     @Override
     public synchronized V putIfAbsent(K key, V value) {
         V originalValue = get(key);
@@ -453,6 +447,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             iter.close();
         }
 
+        @Override
+        public K peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return serdes.keyFrom(iter.key());
+
+        }
+
     }
 
     private class RocksDBRangeIterator extends RocksDbIterator {


Mime
View raw message