kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [4/5] kafka git commit: KAFKA-3452 Follow-up: Optimize ByteStore Scenarios
Date Fri, 03 Feb 2017 19:12:57 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index a8ddc73..6c5efcb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -28,116 +28,78 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 
-class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {
+class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
 
     private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
-    private final SegmentedBytesStore bytesStore;
-    private StateSerdes<K, AGG> serdes;
+    protected final SegmentedBytesStore bytesStore;
 
+    protected StateSerdes<K, AGG> serdes;
 
-    RocksDBSessionStore(final SegmentedBytesStore bytesStore,
-                        final Serde<K> keySerde,
-                        final Serde<AGG> aggSerde) {
-        this.keySerde = keySerde;
-        this.bytesStore = bytesStore;
-        this.aggSerde = aggSerde;
-    }
-
+    // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
+    private static class RocksDBSessionBytesStore extends RocksDBSessionStore<Bytes, byte[]> {
+        RocksDBSessionBytesStore(final SegmentedBytesStore inner) {
+            super(inner, Serdes.Bytes(), Serdes.ByteArray());
+        }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
-        return new SessionStoreIterator(bytesIterator, serdes);
-    }
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, earliestSessionEndTime, latestSessionStartTime);
+            return WrappedSessionStoreIterator.bytesIterator(bytesIterator, serdes);
+        }
 
+        @Override
+        public void remove(final Windowed<Bytes> key) {
+            bytesStore.remove(SessionKeySerde.bytesToBinary(key));
+        }
 
-    @Override
-    public void remove(final Windowed<K> key) {
-        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer()));
+        @Override
+        public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+            bytesStore.put(SessionKeySerde.bytesToBinary(sessionKey), aggregate);
+        }
     }
 
-    @Override
-    public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
+    static RocksDBSessionStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner) {
+        return new RocksDBSessionBytesStore(inner);
     }
 
-    @Override
-    public String name() {
-        return bytesStore.name();
+    RocksDBSessionStore(final SegmentedBytesStore bytesStore,
+                        final Serde<K> keySerde,
+                        final Serde<AGG> aggSerde) {
+        super(bytesStore);
+        this.keySerde = keySerde;
+        this.bytesStore = bytesStore;
+        this.aggSerde = aggSerde;
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         this.serdes = new StateSerdes<>(bytesStore.name(),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
         bytesStore.init(context, root);
     }
 
     @Override
-    public void flush() {
-        bytesStore.flush();
-    }
-
-    @Override
-    public void close() {
-        bytesStore.close();
+    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
+        return new WrappedSessionStoreIterator<>(bytesIterator, serdes);
     }
 
     @Override
-    public boolean persistent() {
-        return true;
+    public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+        return findSessions(key, 0, Long.MAX_VALUE);
     }
 
     @Override
-    public boolean isOpen() {
-        return bytesStore.isOpen();
+    public void remove(final Windowed<K> key) {
+        bytesStore.remove(SessionKeySerde.toBinary(key, serdes.keySerializer()));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-        return findSessions(key, 0, Long.MAX_VALUE);
-    }
-
-    private static class SessionStoreIterator<K, AGG> implements KeyValueIterator<Windowed<K>, AGG> {
-
-        private final KeyValueIterator<Bytes, byte[]> bytesIterator;
-        private final StateSerdes<K, AGG> serdes;
-
-        SessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<K, AGG> serdes) {
-            this.bytesIterator = bytesIterator;
-            this.serdes = serdes;
-        }
-
-        @Override
-        public void close() {
-            bytesIterator.close();
-        }
-
-        @Override
-        public Windowed<K> peekNextKey() {
-            final Bytes bytes = bytesIterator.peekNextKey();
-            return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer());
-        }
-
-        @Override
-        public boolean hasNext() {
-            return bytesIterator.hasNext();
-        }
-
-        @Override
-        public KeyValue<Windowed<K>, AGG> next() {
-            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
-            return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value));
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("remove not supported by SessionStoreIterator");
-        }
+    public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer()), aggSerde.serializer().serialize(bytesStore.name(), aggregate));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index 10ebf65..9eb9447 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.SessionStore;
@@ -33,11 +32,10 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-
 public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
 
+    private static final String METRIC_SCOPE = "rocksdb-session";
     private static final int NUM_SEGMENTS = 3;
-    public static final String METRIC_SCOPE = "rocksdb-session-store";
     private final long retentionPeriod;
     private final boolean cached;
 
@@ -56,15 +54,14 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
                                                                                      retentionPeriod,
                                                                                      NUM_SEGMENTS,
-                                                                                     keySchema
-        );
+                                                                                     keySchema);
 
         if (cached && logged) {
             final ChangeLoggingSegmentedBytesStore logged = new ChangeLoggingSegmentedBytesStore(segmented);
             final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(logged,
                                                                                       METRIC_SCOPE, time);
             final RocksDBSessionStore<Bytes, byte[]> sessionStore
-                    = new RocksDBSessionStore<>(metered, Serdes.Bytes(), Serdes.ByteArray());
+                    = RocksDBSessionStore.bytesStore(metered);
 
             return new CachingSessionStore<>(sessionStore, keySerde, valueSerde);
         }
@@ -73,7 +70,7 @@ public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K,
             final MeteredSegmentedBytesStore metered = new MeteredSegmentedBytesStore(segmented,
                                                                                       METRIC_SCOPE, time);
             final RocksDBSessionStore<Bytes, byte[]> sessionStore
-                    = new RocksDBSessionStore<>(metered, Serdes.Bytes(), Serdes.ByteArray());
+                    = RocksDBSessionStore.bytesStore(metered);
 
             return new CachingSessionStore<>(sessionStore, keySerde, valueSerde);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 280d5c4..108c772 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -152,12 +152,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
         context.register(root, false, new StateRestoreCallback() {
-
             @Override
             public void restore(byte[] key, byte[] value) {
                 putInternal(key, value);
             }
         });
+
         open = true;
     }
 
@@ -283,7 +283,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     public synchronized KeyValueIterator<K, V> range(K from, K to) {
         validateStoreOpen();
         // query rocksdb
-        final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(db.newIterator(), serdes, from, to);
+        final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
         openIterators.add(rocksDBRangeIterator);
         return rocksDBRangeIterator;
     }
@@ -294,7 +294,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         // query rocksdb
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
-        final RocksDbIterator rocksDbIterator = new RocksDbIterator(innerIter, serdes);
+        final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter, serdes);
         openIterators.add(rocksDbIterator);
         return rocksDbIterator;
     }
@@ -312,6 +312,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
      */
     @Override
     public long approximateNumEntries() {
+        validateStoreOpen();
         long value;
         try {
             value = this.db.getLongProperty("rocksdb.estimate-num-keys");
@@ -375,15 +376,17 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         openIterators.clear();
     }
 
-
-    class RocksDbIterator implements KeyValueIterator<K, V> {
+    private class RocksDbIterator implements KeyValueIterator<K, V> {
+        private final String storeName;
         private final RocksIterator iter;
         private final StateSerdes<K, V> serdes;
-        private boolean open = true;
 
-        RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
+        private volatile boolean open = true;
+
+        RocksDbIterator(String storeName, RocksIterator iter, StateSerdes<K, V> serdes) {
             this.iter = iter;
             this.serdes = serdes;
+            this.storeName = storeName;
         }
 
         byte[] peekRawKey() {
@@ -397,8 +400,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         @Override
         public synchronized boolean hasNext() {
             if (!open) {
-                throw new InvalidStateStoreException("store %s has closed");
+                throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
             }
+
             return iter.isValid();
         }
 
@@ -415,19 +419,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             return entry;
         }
 
-        /**
-         * @throws UnsupportedOperationException
-         */
         @Override
         public void remove() {
-            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+            throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
         }
 
         @Override
         public synchronized void close() {
-            open = false;
             openIterators.remove(this);
             iter.close();
+            open = false;
         }
 
         @Override
@@ -436,9 +437,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 throw new NoSuchElementException();
             }
             return serdes.keyFrom(iter.key());
-
         }
-
     }
 
     private class RocksDBRangeIterator extends RocksDbIterator {
@@ -448,8 +447,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
         private byte[] rawToKey;
 
-        RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
-            super(iter, serdes);
+        RocksDBRangeIterator(String storeName, RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
+            super(storeName, iter, serdes);
             iter.seek(serdes.rawKey(from));
             this.rawToKey = serdes.rawKey(to);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 80c4796..8c7a557 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -22,7 +22,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -30,38 +29,50 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.NoSuchElementException;
+class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
 
-class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
-
-    private final SegmentedBytesStore bytesStore;
-    private final boolean retainDuplicates;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
+    protected final SegmentedBytesStore bytesStore;
+    protected final boolean retainDuplicates;
+
     private ProcessorContext context;
-    private int seqnum = 0;
-    private StateSerdes<K, V> serdes;
+    protected StateSerdes<K, V> serdes;
+    protected int seqnum = 0;
 
+    // this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
+    private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {
+        RocksDBWindowBytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) {
+            super(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates);
+        }
 
-    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) {
-        return new RocksDBWindowStore<>(inner, Serdes.Bytes(), Serdes.ByteArray(), retainDuplicates);
+        @Override
+        public void put(Bytes key, byte[] value, long timestamp) {
+            maybeUpdateSeqnumForDups();
+
+            bytesStore.put(WindowStoreUtils.toBinaryKey(key.get(), timestamp, seqnum), value);
+        }
+
+        @Override
+        public WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
+            final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(key, timeFrom, timeTo);
+            return WrappedWindowStoreIterator.bytesIterator(bytesIterator, serdes);
+        }
     }
 
+    static RocksDBWindowStore<Bytes, byte[]> bytesStore(final SegmentedBytesStore inner, final boolean retainDuplicates) {
+        return new RocksDBWindowBytesStore(inner, retainDuplicates);
+    }
 
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde,
                        final boolean retainDuplicates) {
+        super(bytesStore);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
-        this.retainDuplicates = retainDuplicates;
         this.bytesStore = bytesStore;
-    }
-
-
-    @Override
-    public String name() {
-        return bytesStore.name();
+        this.retainDuplicates = retainDuplicates;
     }
 
     @Override
@@ -77,91 +88,26 @@ class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
-    public boolean persistent() {
-        return true;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return bytesStore.isOpen();
-    }
-
-    @Override
-    public void flush() {
-        bytesStore.flush();
-    }
-
-    @Override
-    public void close() {
-        bytesStore.close();
-    }
-
-    @Override
     public void put(K key, V value) {
         put(key, value, context.timestamp());
     }
 
     @Override
     public void put(K key, V value, long timestamp) {
-        if (retainDuplicates) {
-            seqnum = (seqnum + 1) & 0x7FFFFFFF;
-        }
-        bytesStore.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes)), serdes.rawValue(value));
-    }
+        maybeUpdateSeqnumForDups();
 
+        bytesStore.put(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes), serdes.rawValue(value));
+    }
 
-    @SuppressWarnings("unchecked")
     @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
-        return new TheWindowStoreIterator<>(bytesIterator, serdes);
+        return new WrappedWindowStoreIterator<>(bytesIterator, serdes);
     }
 
-    private static class TheWindowStoreIterator<V> implements WindowStoreIterator<V> {
-        private final KeyValueIterator<Bytes, byte[]> actual;
-        private final StateSerdes<?, V> serdes;
-
-        TheWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> actual, final StateSerdes<?, V> serdes) {
-            this.actual = actual;
-            this.serdes = serdes;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return actual.hasNext();
-        }
-
-        /**
-         * @throws NoSuchElementException if no next element exists
-         */
-        @Override
-        public KeyValue<Long, V> next() {
-            if (!actual.hasNext()) {
-                throw new NoSuchElementException();
-            }
-            final KeyValue<Bytes, byte[]> next = actual.next();
-            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
-            final V value = serdes.valueFrom(next.value);
-            return KeyValue.pair(timestamp, value);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() {
-            actual.close();
-        }
-
-        @Override
-        public Long peekNextKey() {
-            if (!actual.hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return WindowStoreUtils.timestampFromBinaryKey(actual.peekNextKey().get());
+    void maybeUpdateSeqnumForDups() {
+        if (retainDuplicates) {
+            seqnum = (seqnum + 1) & 0x7FFFFFFF;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index abaaffd..9782226 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -65,7 +65,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
                                 name,
                                 retentionPeriod,
                                 numSegments,
-                                new WindowStoreKeySchema()
+                                new WindowKeySchema()
                         )));
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 5471262..5bbdaab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-// use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
+// Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
 class Segment extends RocksDBStore<Bytes, byte[]> {
     public final long id;
 
@@ -29,14 +29,16 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
         this.id = id;
     }
 
-    public void destroy() {
+    void destroy() {
         Utils.delete(dbDir);
     }
 
     @Override
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
+
+        // skip the registering step
+
         open = true;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index e6715b2..c449fdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -30,12 +30,13 @@ import java.util.NoSuchElementException;
  */
 class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
 
-    private final Iterator<Segment> segments;
-    private final HasNextCondition hasNextCondition;
     private final Bytes from;
     private final Bytes to;
-    private KeyValueIterator<Bytes, byte[]> currentIterator;
+    private final Iterator<Segment> segments;
+    private final HasNextCondition hasNextCondition;
+
     private KeyValueStore<Bytes, byte[]> currentSegment;
+    private KeyValueIterator<Bytes, byte[]> currentIterator;
 
     SegmentIterator(final Iterator<Segment> segments,
                     final HasNextCondition hasNextCondition,
@@ -62,7 +63,7 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
         return currentIterator.peekNextKey();
     }
 
-    @SuppressWarnings("unchecked")
+    @Override
     public boolean hasNext() {
         boolean hasNext = false;
         while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
@@ -86,7 +87,6 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
     }
 
     public void remove() {
-        throw new UnsupportedOperationException("remove not supported");
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index ab1099e..ab08fe6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -65,13 +65,12 @@ public interface SegmentedBytesStore extends StateStore {
      */
     byte[] get(Bytes key);
 
-
     interface KeySchema {
         /**
          * Given a record-key and a time, construct a Segmented key that represents
          * the upper range of keys to search when performing range queries.
          * @see SessionKeySchema#upperRange
-         * @see WindowStoreKeySchema#upperRange
+         * @see WindowKeySchema#upperRange
          * @param key
          * @param to
          * @return      The key that represents the upper range to search for in the store
@@ -82,7 +81,7 @@ public interface SegmentedBytesStore extends StateStore {
          * Given a record-key and a time, construct a Segmented key that represents
          * the lower range of keys to search when performing range queries.
          * @see SessionKeySchema#lowerRange
-         * @see WindowStoreKeySchema#lowerRange
+         * @see WindowKeySchema#lowerRange
          * @param key
          * @param from
          * @return      The key that represents the lower range to search for in the store
@@ -93,7 +92,7 @@ public interface SegmentedBytesStore extends StateStore {
          * Extract the timestamp of the segment from the key. The key is a composite of
          * the record-key, any timestamps, plus any additional information.
          * @see SessionKeySchema#lowerRange
-         * @see WindowStoreKeySchema#lowerRange
+         * @see WindowKeySchema#lowerRange
          * @param key
          * @return
          */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 13cdf6c..bc2f4fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -35,11 +35,13 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 class Segments {
     static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
+
     private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
     private final String name;
     private final int numSegments;
     private final long segmentInterval;
     private final SimpleDateFormat formatter;
+
     private long currentSegmentId = -1L;
 
     Segments(final String name, final long retentionPeriod, final int numSegments) {
@@ -82,7 +84,6 @@ class Segments {
         }
     }
 
-
     void openExisting(final ProcessorContext context) {
         try {
             File dir = new File(context.stateDir(), name);
@@ -103,7 +104,7 @@ class Segments {
                 }
             } else {
                 if (!dir.mkdir()) {
-                    throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for store %s", dir, name));
+                    throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
                 }
             }
         } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
index d76e8a4..e36e567 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SerializedKeyValueIterator.java
@@ -65,6 +65,6 @@ class SerializedKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
 
     @Override
     public void remove() {
-        throw new UnsupportedOperationException("remove not supported by SerializedKeyValueIterator");
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index f8f0e08..d47b636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -38,10 +38,11 @@ import java.util.NoSuchElementException;
  */
 public class ThreadCache {
     private static final Logger log = LoggerFactory.getLogger(ThreadCache.class);
+
     private final String name;
     private final long maxCacheSizeBytes;
-    private final Map<String, NamedCache> caches = new HashMap<>();
     private final StreamsMetrics metrics;
+    private final Map<String, NamedCache> caches = new HashMap<>();
 
     // internal stats
     private long numPuts = 0;
@@ -49,8 +50,6 @@ public class ThreadCache {
     private long numEvicts = 0;
     private long numFlushes = 0;
 
-
-
     public interface DirtyEntryFlushListener {
         void apply(final List<DirtyEntry> dirty);
     }
@@ -101,7 +100,7 @@ public class ThreadCache {
                   name, puts(), gets(), evicts(), flushes());
     }
 
-    public LRUCacheEntry get(final String namespace, byte[] key) {
+    public LRUCacheEntry get(final String namespace, Bytes key) {
         numGets++;
 
         if (key == null) {
@@ -112,20 +111,21 @@ public class ThreadCache {
         if (cache == null) {
             return null;
         }
-        return cache.get(Bytes.wrap(key));
+        return cache.get(key);
     }
 
-    public void put(final String namespace, byte[] key, LRUCacheEntry value) {
+    public void put(final String namespace, Bytes key, LRUCacheEntry value) {
         numPuts++;
+
         final NamedCache cache = getOrCreateCache(namespace);
-        cache.put(Bytes.wrap(key), value);
+        cache.put(key, value);
         maybeEvict(namespace);
     }
 
-    public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
+    public LRUCacheEntry putIfAbsent(final String namespace, Bytes key, LRUCacheEntry value) {
         final NamedCache cache = getOrCreateCache(namespace);
 
-        final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+        final LRUCacheEntry result = cache.putIfAbsent(key, value);
         maybeEvict(namespace);
 
         if (result == null) {
@@ -134,27 +134,27 @@ public class ThreadCache {
         return result;
     }
 
-    public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
-        for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+    public void putAll(final String namespace, final List<KeyValue<Bytes, LRUCacheEntry>> entries) {
+        for (KeyValue<Bytes, LRUCacheEntry> entry : entries) {
             put(namespace, entry.key, entry.value);
         }
     }
 
-    public LRUCacheEntry delete(final String namespace, final byte[] key) {
+    public LRUCacheEntry delete(final String namespace, final Bytes key) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
             return null;
         }
 
-        return cache.delete(Bytes.wrap(key));
+        return cache.delete(key);
     }
 
-    public MemoryLRUCacheBytesIterator range(final String namespace, final byte[] from, final byte[] to) {
+    public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
             return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
         }
-        return new MemoryLRUCacheBytesIterator(cache.keyRange(cacheKey(from), cacheKey(to)), cache);
+        return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
     }
 
     public MemoryLRUCacheBytesIterator all(final String namespace) {
@@ -229,11 +229,6 @@ public class ThreadCache {
         return cache;
     }
 
-    private Bytes cacheKey(final byte[] keyBytes) {
-        return Bytes.wrap(keyBytes);
-    }
-
-
     static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
         private final Iterator<Bytes> keys;
         private final NamedCache cache;
@@ -303,12 +298,12 @@ public class ThreadCache {
         }
     }
 
-    public static class DirtyEntry {
+    static class DirtyEntry {
         private final Bytes key;
         private final byte[] newValue;
         private final RecordContext recordContext;
 
-        public DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext recordContext) {
+        DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext recordContext) {
             this.key = key;
             this.newValue = newValue;
             this.recordContext = recordContext;
@@ -326,5 +321,4 @@ public class ThreadCache {
             return recordContext;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
new file mode 100644
index 0000000..ea607c0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.List;
+
+class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
+        @Override
+        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+            return iterator.hasNext();
+        }
+    };
+    private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes);
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        return WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes);
+    }
+
+    @Override
+    public long segmentTimestamp(final Bytes key) {
+        return WindowStoreUtils.timestampFromBinaryKey(key.get());
+    }
+
+    @Override
+    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
+        return ITERATOR_HAS_NEXT;
+    }
+
+    @Override
+    public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
+        return segments.segments(from, to);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
deleted file mode 100644
index 093161e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.List;
-
-class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
-    private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
-        @Override
-        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-            return iterator.hasNext();
-        }
-    };
-    private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema", Serdes.Bytes(), Serdes.ByteArray());
-
-    @Override
-    public Bytes upperRange(final Bytes key, final long to) {
-        return Bytes.wrap(WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, serdes));
-    }
-
-    @Override
-    public Bytes lowerRange(final Bytes key, final long from) {
-        return Bytes.wrap(WindowStoreUtils.toBinaryKey(key, Math.max(0, from), 0, serdes));
-    }
-
-    @Override
-    public long segmentTimestamp(final Bytes key) {
-        return WindowStoreUtils.timestampFromBinaryKey(key.get());
-    }
-
-    @Override
-    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
-        return ITERATOR_HAS_NEXT;
-    }
-
-    @Override
-    public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
-        return segments.segments(from, to);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
index 9c93dcc..8d0d673 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
@@ -19,6 +19,13 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
+/**
+ * A windowed state store supplier that extends the {@link StateStoreSupplier} interface.
+ *
+ * @param <T> State store type
+ */
 public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> {
+
+    // window retention period in milli-second
     long retentionPeriod();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 074cf8a..d97b053 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -32,26 +32,25 @@ public class WindowStoreUtils {
     private static final int TIMESTAMP_SIZE = 8;
 
     /** Inner byte array serde used for segments */
-    public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
-    public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
-    public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
+    static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
+    static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
+    static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
 
-
-    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
+    static <K> Bytes toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);
         return toBinaryKey(serializedKey, timestamp, seqnum);
     }
 
-    static byte[] toBinaryKey(byte[] serializedKey, final long timestamp, final int seqnum) {
+    static Bytes toBinaryKey(byte[] serializedKey, final long timestamp, final int seqnum) {
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
         buf.put(serializedKey);
         buf.putLong(timestamp);
         buf.putInt(seqnum);
 
-        return buf.array();
+        return Bytes.wrap(buf.array());
     }
 
-    public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) {
+    static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) {
         byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
 
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
@@ -59,7 +58,7 @@ public class WindowStoreUtils {
         return serdes.keyFrom(bytes);
     }
 
-    public static Bytes bytesKeyFromBinaryKey(byte[] binaryKey) {
+    static Bytes bytesKeyFromBinaryKey(byte[] binaryKey) {
         byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
 
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
@@ -67,11 +66,11 @@ public class WindowStoreUtils {
         return Bytes.wrap(bytes);
     }
 
-    public static long timestampFromBinaryKey(byte[] binaryKey) {
+    static long timestampFromBinaryKey(byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
     }
 
-    public static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
+    static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
new file mode 100644
index 0000000..b8db3a6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedSessionStoreIterator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+
+class WrappedSessionStoreIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+    final KeyValueIterator<Bytes, byte[]> bytesIterator;
+    private final StateSerdes<K, V> serdes;
+
+    // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
+    private static class WrappedSessionStoreBytesIterator extends WrappedSessionStoreIterator<Bytes, byte[]> {
+        WrappedSessionStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                         final StateSerdes<Bytes, byte[]> serdes) {
+            super(underlying, serdes);
+        }
+
+        @Override
+        public Windowed<Bytes> peekNextKey() {
+            final Bytes key = bytesIterator.peekNextKey();
+
+            return SessionKeySerde.fromBytes(key);
+        }
+
+        @Override
+        public KeyValue<Windowed<Bytes>, byte[]> next() {
+            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+            return KeyValue.pair(SessionKeySerde.fromBytes(next.key), next.value);
+        }
+    }
+
+    static WrappedSessionStoreIterator<Bytes, byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                                                    final StateSerdes<Bytes, byte[]> serdes) {
+        return new WrappedSessionStoreBytesIterator(underlying, serdes);
+    }
+
+    WrappedSessionStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<K, V> serdes) {
+        this.bytesIterator = bytesIterator;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public void close() {
+        bytesIterator.close();
+    }
+
+    @Override
+    public Windowed<K> peekNextKey() {
+        final Bytes bytes = bytesIterator.peekNextKey();
+        return SessionKeySerde.from(bytes.get(), serdes.keyDeserializer());
+    }
+
+    @Override
+    public boolean hasNext() {
+        return bytesIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<Windowed<K>, V> next() {
+        final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+        return KeyValue.pair(SessionKeySerde.from(next.key.get(), serdes.keyDeserializer()), serdes.valueFrom(next.value));
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 3d80b98..cbde4e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -34,10 +34,10 @@ interface WrappedStateStore extends StateStore {
      */
     StateStore inner();
 
-    abstract class AbstractWrappedStateStore implements WrappedStateStore {
+    abstract class AbstractStateStore implements WrappedStateStore {
         final StateStore innerState;
 
-        AbstractWrappedStateStore(StateStore inner) {
+        AbstractStateStore(StateStore inner) {
             this.innerState = inner;
         }
 
@@ -85,6 +85,4 @@ interface WrappedStateStore extends StateStore {
             innerState.close();
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
new file mode 100644
index 0000000..1e48324
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedWindowStoreIterator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.NoSuchElementException;
+
+class WrappedWindowStoreIterator<V> implements WindowStoreIterator<V> {
+    final KeyValueIterator<Bytes, byte[]> bytesIterator;
+    private final StateSerdes<?, V> serdes;
+
+    // this is optimizing the case when underlying is already a bytes store iterator, in which we can avoid Bytes.wrap() costs
+    private static class WrappedWindowStoreBytesIterator extends WrappedWindowStoreIterator<byte[]> {
+        WrappedWindowStoreBytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                        final StateSerdes<Bytes, byte[]> serdes) {
+            super(underlying, serdes);
+        }
+
+        @Override
+        public KeyValue<Long, byte[]> next() {
+            if (!bytesIterator.hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+            final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+            final byte[] value = next.value;
+            return KeyValue.pair(timestamp, value);
+        }
+    }
+
+    static WrappedWindowStoreIterator<byte[]> bytesIterator(final KeyValueIterator<Bytes, byte[]> underlying,
+                                                            final StateSerdes<Bytes, byte[]> serdes) {
+        return new WrappedWindowStoreBytesIterator(underlying, serdes);
+    }
+
+    WrappedWindowStoreIterator(final KeyValueIterator<Bytes, byte[]> bytesIterator, final StateSerdes<?, V> serdes) {
+        this.bytesIterator = bytesIterator;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return bytesIterator.hasNext();
+    }
+
+    /**
+     * @throws NoSuchElementException if no next element exists
+     */
+    @Override
+    public KeyValue<Long, V> next() {
+        final KeyValue<Bytes, byte[]> next = bytesIterator.next();
+        final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.key.get());
+        final V value = serdes.valueFrom(next.value);
+        return KeyValue.pair(timestamp, value);
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
+    }
+
+    @Override
+    public void close() {
+        bytesIterator.close();
+    }
+
+    @Override
+    public Long peekNextKey() {
+        return WindowStoreUtils.timestampFromBinaryKey(bytesIterator.peekNextKey().get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index 3a0f490..3be6373 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.junit.Test;
 
@@ -28,60 +30,79 @@ import static org.junit.Assert.assertNull;
 
 public class SessionKeySerdeTest {
 
+    final private String topic = "topic";
+    final private String key = "key";
+    final private long startTime = 50L;
+    final private long endTime = 100L;
+    final private Window window = new SessionWindow(startTime, endTime);
+    final private Windowed<String> windowedKey = new Windowed<>(key, window);
+    final private Serde<String> serde = Serdes.String();
+    final private SessionKeySerde<String> sessionKeySerde = new SessionKeySerde<>(serde);
+
     @Test
     public void shouldSerializeDeserialize() throws Exception {
-        final Windowed<Long> key = new Windowed<>(1L, new SessionWindow(10, 100));
-        final SessionKeySerde<Long> serde = new SessionKeySerde<>(Serdes.Long());
-        final byte[] bytes = serde.serializer().serialize("t", key);
-        final Windowed<Long> result = serde.deserializer().deserialize("t", bytes);
-        assertEquals(key, result);
+        final byte[] bytes = sessionKeySerde.serializer().serialize(topic, windowedKey);
+        final Windowed<String> result = sessionKeySerde.deserializer().deserialize(topic, bytes);
+        assertEquals(windowedKey, result);
     }
 
     @Test
     public void shouldSerializeNullToNull() throws Exception {
-        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
-        assertNull(serde.serializer().serialize("t", null));
+        assertNull(sessionKeySerde.serializer().serialize(topic, null));
     }
 
     @Test
     public void shouldDeSerializeEmtpyByteArrayToNull() throws Exception {
-        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
-        assertNull(serde.deserializer().deserialize("t", new byte[0]));
+        assertNull(sessionKeySerde.deserializer().deserialize(topic, new byte[0]));
     }
 
     @Test
     public void shouldDeSerializeNullToNull() throws Exception {
-        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
-        assertNull(serde.deserializer().deserialize("t", null));
+        assertNull(sessionKeySerde.deserializer().deserialize(topic, null));
     }
 
     @Test
     public void shouldConvertToBinaryAndBack() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 20));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
         final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer());
-        assertEquals(key, result);
+        assertEquals(windowedKey, result);
     }
 
     @Test
     public void shouldExtractEndTimeFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new SessionWindow(10, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
-        assertEquals(100, SessionKeySerde.extractEnd(serialized.get()));
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get()));
     }
 
     @Test
     public void shouldExtractStartTimeFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("key", new SessionWindow(50, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
-        assertEquals(50, SessionKeySerde.extractStart(serialized.get()));
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        assertEquals(startTime, SessionKeySerde.extractStart(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractWindowFromBindary() throws Exception {
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        assertEquals(window, SessionKeySerde.extractWindow(serialized.get()));
     }
 
     @Test
     public void shouldExtractKeyBytesFromBinary() throws Exception {
-        final Windowed<String> key = new Windowed<>("blah", new SessionWindow(50, 100));
-        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
-        assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
     }
 
+    @Test
+    public void shouldExtractKeyFromBinary() throws Exception {
+        final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer());
+        assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer()));
+    }
+
+    @Test
+    public void shouldExtractBytesKeyFromBinary() throws Exception {
+        final Bytes bytesKey = Bytes.wrap(key.getBytes());
+        final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
+        final Bytes serialized = SessionKeySerde.bytesToBinary(windowedBytesKey);
+        assertEquals(windowedBytesKey, SessionKeySerde.fromBytes(serialized));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ba27c53..ea9e870 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -232,6 +232,7 @@ public class ProcessorStateManagerTest {
                 put(nonPersistentStoreName, nonPersistentStoreName);
             }
         });
+
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
         } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index a00526f..dc510c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,7 +57,7 @@ public class CachingKeyValueStoreTest {
     @Before
     public void setUp() throws Exception {
         final String storeName = "store";
-        underlyingStore = new InMemoryKeyValueStore<>(storeName);
+        underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 2728aa0..39947b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -46,19 +46,19 @@ import static org.junit.Assert.assertNull;
 public class CachingWindowStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 150;
+    private static final long DEFAULT_TIMESTAMP = 10L;
     private static final Long WINDOW_SIZE = 10000L;
     private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
     private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
     private ThreadCache cache;
     private String topic;
-    private static final long DEFAULT_TIMESTAMP = 10L;
-    private WindowStoreKeySchema keySchema;
+    private WindowKeySchema keySchema;
     private RocksDBWindowStore<Bytes, byte[]> windowStore;
 
     @Before
     public void setUp() throws Exception {
-        keySchema = new WindowStoreKeySchema();
+        keySchema = new WindowKeySchema();
         underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
         windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
@@ -152,7 +152,7 @@ public class CachingWindowStoreTest {
     @Test
     public void shouldIterateCacheAndStore() throws Exception {
         final Bytes key = Bytes.wrap("1" .getBytes());
-        underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES)), "a".getBytes());
+        underlying.put(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES), "a".getBytes());
         cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
         assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 58b42e3..123d8ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -40,7 +39,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class ChangeLoggingKeyValueBytesStoreTest {
 
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv");
+    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
     private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
     private final Map sent = new HashMap<>();
     private final Bytes hi = Bytes.wrap("hi".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
index d45f82e..27503ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -43,7 +42,7 @@ import static org.junit.Assert.assertFalse;
 
 public class ChangeLoggingKeyValueStoreTest {
 
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv");
+    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
     private final Serde<String> keySerde = Serdes.String();
     private final Serde<String> valueSerde = Serdes.String();
     private final ChangeLoggingKeyValueStore<String, String> store

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 0fd6001..763ca48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -98,7 +98,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
         stubOneUnderlying.put("b", "b");
         stubOneUnderlying.put("c", "c");
 
-        final List<KeyValue<String, String>> results = toList(theStore.range("a", "c"));
+        final List<KeyValue<String, String>> results = toList(theStore.range("a", "b"));
         assertTrue(results.contains(new KeyValue<>("a", "a")));
         assertTrue(results.contains(new KeyValue<>("b", "b")));
         assertEquals(2, results.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index 0ebdd5c..3f5ea6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -34,7 +34,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
 
     @Before
     public void setUp() throws Exception {
-        store = new InMemoryKeyValueStore<>(name);
+        store = new InMemoryKeyValueStore<>(name, Serdes.String(), Serdes.String());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index fbcefab..161b791 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,7 +39,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
 
     @Before
     public void setUp() throws Exception {
-        store = new InMemoryKeyValueStore<>(namespace);
+        store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
         cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
     }
 
@@ -49,13 +48,13 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
         for (int i = 0; i < bytes.length; i += 2) {
             store.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1]));
+            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
         }
 
         final Bytes from = Bytes.wrap(new byte[]{2});
         final Bytes to = Bytes.wrap(new byte[]{9});
-        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store",  store.range(from, to));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
 
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
         byte[][] values = new byte[8][];
@@ -73,7 +72,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     public void shouldSkipLargerDeletedCacheValue() throws Exception {
         final byte[][] bytes = {{0}, {1}};
         store.put(Bytes.wrap(bytes[0]), bytes[0]);
-        cache.put(namespace, bytes[1], new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
         assertArrayEquals(bytes[0], iterator.next().key);
         assertFalse(iterator.hasNext());
@@ -82,7 +81,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldSkipSmallerDeletedCachedValue() throws Exception {
         final byte[][] bytes = {{0}, {1}};
-        cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
         store.put(Bytes.wrap(bytes[1]), bytes[1]);
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
         assertArrayEquals(bytes[1], iterator.next().key);
@@ -92,7 +91,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
         final byte[][] bytes = {{0}};
-        cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
         store.put(Bytes.wrap(bytes[0]), bytes[0]);
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
         assertFalse(iterator.hasNext());
@@ -102,8 +101,9 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}};
         for (byte[] aByte : bytes) {
-            store.put(Bytes.wrap(aByte), aByte);
-            cache.put(namespace, aByte, new LRUCacheEntry(null));
+            Bytes aBytes = Bytes.wrap(aByte);
+            store.put(aBytes, aByte);
+            cache.put(namespace, aBytes, new LRUCacheEntry(null));
         }
         assertFalse(createIterator().hasNext());
     }
@@ -112,7 +112,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}};
         for (byte[] aByte : bytes) {
-            cache.put(namespace, aByte, new LRUCacheEntry(null));
+            cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(null));
         }
         assertFalse(createIterator().hasNext());
     }
@@ -121,14 +121,15 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     public void shouldSkipAllDeletedFromCache() throws Exception {
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
         for (byte[] aByte : bytes) {
-            store.put(Bytes.wrap(aByte), aByte);
-            cache.put(namespace, aByte, new LRUCacheEntry(aByte));
+            Bytes aBytes = Bytes.wrap(aByte);
+            store.put(aBytes, aByte);
+            cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
         }
-        cache.put(namespace, bytes[1], new LRUCacheEntry(null));
-        cache.put(namespace, bytes[2], new LRUCacheEntry(null));
-        cache.put(namespace, bytes[3], new LRUCacheEntry(null));
-        cache.put(namespace, bytes[8], new LRUCacheEntry(null));
-        cache.put(namespace, bytes[11], new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
 
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
         assertArrayEquals(bytes[0], iterator.next().key);
@@ -144,19 +145,19 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
 
     @Test
     public void shouldPeekNextKey() throws Exception {
-        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one");
+        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
         final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
         byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
         final String namespace = "one";
         for (int i = 0; i < bytes.length - 1; i += 2) {
             kv.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1]));
+            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
         }
 
         final Bytes from = Bytes.wrap(new byte[]{2});
         final Bytes to = Bytes.wrap(new byte[]{9});
         final KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
 
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator =
                 new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator,
@@ -178,6 +179,4 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
         final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all());
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
     }
-
-
 }
\ No newline at end of file


Mime
View raw message