kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [5/5] kafka git commit: KAFKA-3452 Follow-up: Optimize ByteStore Scenarios
Date Fri, 03 Feb 2017 19:12:58 GMT
KAFKA-3452 Follow-up: Optimize ByteStore Scenarios

This is a refactoring follow-up of https://github.com/apache/kafka/pull/2166. Main refactoring changes:

1. Extract `InMemoryKeyValueStore` out of `InMemoryKeyValueStoreSupplier` and remove its duplicates in test package.

2. Add two abstract classes `AbstractKeyValueIterator` and `AbstractKeyValueStore` to collapse common functional logics.

3. Added specialized `BytesXXStore` to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes.

4. Make the key type in `ThreadCache` from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary `Bytes.get()` and `Bytes.wrap(bytes)`.

Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future.

Ping dguy enothereska mjsax for reviews

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Jun Rao

Closes #2333 from guozhangwang/K3452-followup-state-store-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7ebc5da6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7ebc5da6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7ebc5da6

Branch: refs/heads/trunk
Commit: 7ebc5da606fb806bde8b7176fed0e60243e0f7f3
Parents: d95f22c
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Feb 3 11:12:49 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 3 11:12:49 2017 -0800

----------------------------------------------------------------------
 .../kstream/internals/AbstractStream.java       |   28 +-
 .../kstream/internals/SessionKeySerde.java      |   23 +-
 .../streams/processor/TopologyBuilder.java      |    4 +-
 .../processor/internals/AbstractTask.java       |    1 -
 .../internals/ProcessorStateManager.java        |    2 -
 .../kafka/streams/state/KeyValueStore.java      |    5 +-
 .../apache/kafka/streams/state/WindowStore.java |    1 -
 .../streams/state/WindowStoreIterator.java      |    2 +-
 .../AbstractMergedSortedCacheStoreIterator.java |    2 +-
 .../state/internals/AbstractStoreSupplier.java  |    9 +-
 .../state/internals/CachingKeyValueStore.java   |   38 +-
 .../state/internals/CachingSessionStore.java    |   76 +-
 .../state/internals/CachingWindowStore.java     |   38 +-
 .../ChangeLoggingKeyValueBytesStore.java        |   13 +-
 .../internals/ChangeLoggingKeyValueStore.java   |   16 +-
 .../ChangeLoggingSegmentedBytesStore.java       |    3 +-
 .../internals/CompositeReadOnlyWindowStore.java |    1 +
 .../DelegatingPeekingKeyValueIterator.java      |   10 +-
 .../internals/InMemoryKeyValueLoggedStore.java  |   40 +-
 .../state/internals/InMemoryKeyValueStore.java  |  187 +++
 .../InMemoryKeyValueStoreSupplier.java          |  165 +--
 .../streams/state/internals/LRUCacheEntry.java  |   17 +-
 .../streams/state/internals/MemoryLRUCache.java |    2 +-
 .../MergedSortedCacheKeyValueStoreIterator.java |    6 +-
 .../MergedSortedCacheSessionStoreIterator.java  |    2 +-
 .../state/internals/MeteredKeyValueStore.java   |   20 +-
 .../internals/MeteredSegmentedBytesStore.java   |    4 +-
 .../streams/state/internals/NamedCache.java     |    6 +-
 .../internals/RocksDBSegmentedBytesStore.java   |   14 +-
 .../state/internals/RocksDBSessionStore.java    |  122 +-
 .../internals/RocksDBSessionStoreSupplier.java  |   11 +-
 .../streams/state/internals/RocksDBStore.java   |   33 +-
 .../state/internals/RocksDBWindowStore.java     |  124 +-
 .../internals/RocksDBWindowStoreSupplier.java   |    2 +-
 .../kafka/streams/state/internals/Segment.java  |    8 +-
 .../state/internals/SegmentIterator.java        |   12 +-
 .../state/internals/SegmentedBytesStore.java    |    7 +-
 .../kafka/streams/state/internals/Segments.java |    5 +-
 .../internals/SerializedKeyValueIterator.java   |    2 +-
 .../streams/state/internals/ThreadCache.java    |   40 +-
 .../state/internals/WindowKeySchema.java        |   59 +
 .../state/internals/WindowStoreKeySchema.java   |   59 -
 .../state/internals/WindowStoreSupplier.java    |    7 +
 .../state/internals/WindowStoreUtils.java       |   21 +-
 .../internals/WrappedSessionStoreIterator.java  |   89 ++
 .../state/internals/WrappedStateStore.java      |    6 +-
 .../internals/WrappedWindowStoreIterator.java   |   93 ++
 .../kstream/internals/SessionKeySerdeTest.java  |   67 +-
 .../internals/ProcessorStateManagerTest.java    |    1 +
 .../internals/CachingKeyValueStoreTest.java     |    3 +-
 .../state/internals/CachingWindowStoreTest.java |    8 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java    |    3 +-
 .../ChangeLoggingKeyValueStoreTest.java         |    3 +-
 .../CompositeReadOnlyKeyValueStoreTest.java     |    2 +-
 .../DelegatingPeekingKeyValueIteratorTest.java  |    4 +-
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   45 +-
 ...rgedSortedCacheSessionStoreIteratorTest.java |  113 --
 ...ergedSortedCacheWindowStoreIteratorTest.java |   88 --
 ...tedCacheWrappedSessionStoreIteratorTest.java |  113 ++
 ...rtedCacheWrappedWindowStoreIteratorTest.java |   89 ++
 .../internals/ReadOnlyWindowStoreStub.java      |    2 +-
 .../state/internals/RocksDBWindowStoreTest.java | 1191 +++++++-----------
 .../state/internals/SegmentIteratorTest.java    |   16 +-
 .../state/internals/ThreadCacheTest.java        |  143 ++-
 .../state/internals/WindowStoreUtilsTest.java   |   10 +-
 .../kafka/test/InMemoryKeyValueStore.java       |  152 ---
 66 files changed, 1559 insertions(+), 1929 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index bcffce2..90a450e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -37,7 +37,7 @@ public abstract class AbstractStream<K> {
     protected final String name;
     protected final Set<String> sourceNodes;
 
-    public AbstractStream(KStreamBuilder topology, String name, Set<String> sourceNodes) {
+    AbstractStream(final KStreamBuilder topology, String name, final Set<String> sourceNodes) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
         }
@@ -47,7 +47,7 @@ public abstract class AbstractStream<K> {
         this.sourceNodes = sourceNodes;
     }
 
-    protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
+    Set<String> ensureJoinableWith(final AbstractStream<K> other) {
         Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(other.sourceNodes);
@@ -57,7 +57,7 @@ public abstract class AbstractStream<K> {
         return allSourceNodes;
     }
 
-    public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
+    static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
         return new ValueJoiner<T2, T1, R>() {
             @Override
             public R apply(T2 value2, T1 value1) {
@@ -67,27 +67,27 @@ public abstract class AbstractStream<K> {
     }
 
     @SuppressWarnings("unchecked")
-    public static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
-                                                                  final Serde<T> aggValueSerde,
-                                                                  final String storeName) {
+    static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
+                                                                   final Serde<T> aggValueSerde,
+                                                                   final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
         return storeFactory(keySerde, aggValueSerde, storeName).build();
     }
 
     @SuppressWarnings("unchecked")
-    public static  <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
-                                                                                     final Serde<T> aggValSerde,
-                                                                                     final Windows<W> windows,
-                                                                                     final String storeName) {
+    static  <W extends Window, T, K> StateStoreSupplier<WindowStore> windowedStore(final Serde<K> keySerde,
+                                                                                   final Serde<T> aggValSerde,
+                                                                                   final Windows<W> windows,
+                                                                                   final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
         return storeFactory(keySerde, aggValSerde, storeName)
                 .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
                 .build();
     }
-    @SuppressWarnings("unchecked")
-    public static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
-                                                                         final Serde<T> aggValueSerde,
-                                                                         final String storeName) {
+
+    static  <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(final Serde<K> keySerde,
+                                                                       final Serde<T> aggValueSerde,
+                                                                       final String storeName) {
         return Stores.create(storeName)
                 .withKeys(keySerde)
                 .withValues(aggValueSerde)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index d9a3528..16c7cdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -112,7 +112,6 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         }
     }
 
-
     public static long extractEnd(final byte [] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
     }
@@ -121,6 +120,13 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
     }
 
+    public static Window extractWindow(final byte [] binaryKey) {
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+        return new SessionWindow(start, end);
+    }
+
     public static byte[] extractKeyBytes(final byte[] binaryKey) {
         final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
@@ -129,10 +135,16 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
 
     public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K> keyDeserializer) {
         final K key = extractKey(binaryKey, keyDeserializer);
+        final Window window = extractWindow(binaryKey);
+        return new Windowed<>(key, window);
+    }
+
+    public static Windowed<Bytes> fromBytes(Bytes bytesKey) {
+        final byte[] binaryKey = bytesKey.get();
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-        return new Windowed<>(key, new SessionWindow(start, end));
+        return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start, end));
     }
 
     private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
@@ -156,11 +168,4 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         buf.putLong(sessionKey.window().start());
         return new Bytes(buf.array());
     }
-
-    public static Window extractWindow(final byte [] binaryKey) {
-        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
-        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
-        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
-        return new TimeWindow(start, end);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f4302..b4dbf65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -63,6 +63,8 @@ public class TopologyBuilder {
 
     private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
 
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
     // node factories in a topological order
     private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
 
@@ -113,8 +115,6 @@ public class TopologyBuilder {
 
     private final Set<Pattern> latestResetPatterns = new HashSet<>();
 
-    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
-
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
 
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 55418d5..2a040ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -71,7 +71,6 @@ public abstract class AbstractTask {
         // create the processor state manager
         try {
             this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
-
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ad16c77..1c786e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -140,7 +140,6 @@ public class ProcessorStateManager implements StateManager {
 
         // check that the underlying change log topic exist or not
         String topic = storeToChangelogTopic.get(store.name());
-
         if (topic == null) {
             this.stores.put(store.name(), store);
             return;
@@ -355,7 +354,6 @@ public class ProcessorStateManager implements StateManager {
                         if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
                             String changelogTopic = storeToChangelogTopic.get(storeName);
                             TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
-
                             Long offset = ackedOffsets.get(topicPartition);
 
                             if (offset != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 2a86049..cac6b75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -37,7 +37,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
     /**
      * Update the value associated with this key
      *
-     * @param key They key to associate the value to
+     * @param key The key to associate the value to
      * @param value The value
      * @throws NullPointerException If null is used for key or value.
      */
@@ -47,7 +47,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * Update the value associated with this key, unless a value
      * is already associated with the key
      *
-     * @param key They key to associate the value to
+     * @param key The key to associate the value to
      * @param value The value
      * @return The old value or null if there is no such key.
      * @throws NullPointerException If null is used for key or value.
@@ -70,5 +70,4 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
      * @throws NullPointerException If null is used for key.
      */
     V delete(K key);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index dee3e9b..fb70f42 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -41,5 +41,4 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * Put a key-value pair with the given timestamp into the corresponding window
      */
     void put(K key, V value, long timestamp);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 958b778..86aae77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -29,7 +29,7 @@ import java.io.Closeable;
  * Users need to call its {@code close} method explicitly upon completeness to release resources,
  * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
  *
- * @param <E> Type of values
+ * @param <V> Type of values
  */
 public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 009dad0..bf0f33f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -154,7 +154,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa
 
     @Override
     public void remove() {
-        throw new UnsupportedOperationException("remove() is not supported");
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 3ad44ac..cde8b36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import java.util.Map;
 
 
-public abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
+abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
     protected final String name;
     protected final Serde<K> keySerde;
     protected final Serde<V> valueSerde;
@@ -32,7 +32,12 @@ public abstract class AbstractStoreSupplier<K, V, T extends StateStore> implemen
     protected final boolean logged;
     protected final Map<String, String> logConfig;
 
-    public AbstractStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) {
+    AbstractStoreSupplier(final String name,
+                          final Serde<K> keySerde,
+                          final Serde<V> valueSerde,
+                          final Time time,
+                          final boolean logged,
+                          final Map<String, String> logConfig) {
         this.time = time;
         this.name = name;
         this.valueSerde = valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 1e91b47..4f4f37b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -31,7 +30,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
-class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K, V>, CachedStateStore<K, V> {
+class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V>, CachedStateStore<K, V> {
 
     private final KeyValueStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
@@ -46,6 +45,7 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
     CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying,
                          final Serde<K> keySerde,
                          final Serde<V> valueSerde) {
+        super(underlying);
         this.underlying = underlying;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
@@ -67,7 +67,7 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
     }
 
     @SuppressWarnings("unchecked")
-    void initInternal(final ProcessorContext context) {
+    private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         this.serdes = new StateSerdes<>(underlying.name(),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
@@ -83,7 +83,6 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
                 }
             }
         });
-
     }
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
@@ -93,7 +92,8 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
             if (flushListener != null) {
 
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
-                                    serdes.valueFrom(entry.newValue()), serdes.valueFrom(underlying.get(entry.key())));
+                                    serdes.valueFrom(entry.newValue()),
+                                    serdes.valueFrom(underlying.get(entry.key())));
 
             }
             underlying.put(entry.key(), entry.newValue());
@@ -139,23 +139,18 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
         return get(rawKey);
     }
 
-    private void validateStoreOpen() {
-        if (!isOpen()) {
-            throw new InvalidStateStoreException("Store " + this.name() + " is currently closed");
-        }
-    }
-
     private V get(final byte[] rawKey) {
-        final LRUCacheEntry entry = cache.get(cacheName, rawKey);
+        final Bytes key = Bytes.wrap(rawKey);
+        final LRUCacheEntry entry = cache.get(cacheName, key);
         if (entry == null) {
-            final byte[] rawValue = underlying.get(Bytes.wrap(rawKey));
+            final byte[] rawValue = underlying.get(key);
             if (rawValue == null) {
                 return null;
             }
             // only update the cache if this call is on the streamThread
             // as we don't want other threads to trigger an eviction/flush
             if (Thread.currentThread().equals(streamThread)) {
-                cache.put(cacheName, rawKey, new LRUCacheEntry(rawValue));
+                cache.put(cacheName, key, new LRUCacheEntry(rawValue));
             }
             return serdes.valueFrom(rawValue);
         }
@@ -170,9 +165,9 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
     @Override
     public KeyValueIterator<K, V> range(final K from, final K to) {
         validateStoreOpen();
-        final byte[] origFrom = serdes.rawKey(from);
-        final byte[] origTo = serdes.rawKey(to);
-        final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(Bytes.wrap(origFrom), Bytes.wrap(origTo));
+        final Bytes origFrom = Bytes.wrap(serdes.rawKey(from));
+        final Bytes origTo = Bytes.wrap(serdes.rawKey(to));
+        final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(origFrom, origTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, origFrom, origTo);
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
     }
@@ -199,8 +194,8 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
 
     private synchronized void put(final byte[] rawKey, final V value) {
         final byte[] rawValue = serdes.rawValue(value);
-        cache.put(cacheName, rawKey, new LRUCacheEntry(rawValue, true, context.offset(),
-                                                  context.timestamp(), context.partition(), context.topic()));
+        cache.put(cacheName, Bytes.wrap(rawKey), new LRUCacheEntry(rawValue, true, context.offset(),
+                  context.timestamp(), context.partition(), context.topic()));
     }
 
     @Override
@@ -225,9 +220,10 @@ class CachingKeyValueStore<K, V> implements WrappedStateStore, KeyValueStore<K,
     public synchronized V delete(final K key) {
         validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
+        final Bytes bytesKey = Bytes.wrap(rawKey);
         final V v = get(rawKey);
-        cache.delete(cacheName, serdes.rawKey(key));
-        underlying.delete(Bytes.wrap(rawKey));
+        cache.delete(cacheName, bytesKey);
+        underlying.delete(bytesKey);
         return v;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 2cea915..c33e87c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -34,16 +34,16 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 
-class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
+class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
 
     private final SessionStore<Bytes, byte[]> bytesStore;
     private final SessionKeySchema keySchema;
-    private Serde<K> keySerde;
+    private final Serde<K> keySerde;
     private final Serde<AGG> aggSerde;
-    private InternalProcessorContext context;
     private String cacheName;
-    private StateSerdes<K, AGG> serdes;
     private ThreadCache cache;
+    private StateSerdes<K, AGG> serdes;
+    private InternalProcessorContext context;
     private CacheFlushListener<Windowed<K>, AGG> flushListener;
 
     CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
@@ -56,15 +56,41 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         this.keySchema = new SessionKeySchema();
     }
 
+    @SuppressWarnings("unchecked")
+    public void init(final ProcessorContext context, final StateStore root) {
+        bytesStore.init(context, root);
+        initInternal((InternalProcessorContext) context);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void initInternal(final InternalProcessorContext context) {
+        this.context = context;
+
+        this.serdes = new StateSerdes<>(bytesStore.name(),
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
+
+
+        this.cacheName = context.taskId() + "-" + bytesStore.name();
+        this.cache = this.context.getCache();
+        cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> entries) {
+                for (ThreadCache.DirtyEntry entry : entries) {
+                    putAndMaybeForward(entry, context);
+                }
+            }
+        });
+    }
+
     public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime) {
         validateStoreOpen();
         final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(this.name(), key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
-                                                                                  keySchema.lowerRange(binarySessionId,
-                                                                                                       earliestSessionEndTime).get(),
-                                                                                  keySchema.upperRange(binarySessionId, latestSessionStartTime).get());
+                                                                                  keySchema.lowerRange(binarySessionId, earliestSessionEndTime),
+                                                                                  keySchema.upperRange(binarySessionId, latestSessionStartTime));
         final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(binarySessionId, earliestSessionEndTime, latestSessionStartTime);
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(binarySessionId,
                                                                              earliestSessionEndTime,
@@ -73,18 +99,19 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         return new MergedSortedCacheSessionStoreIterator<>(filteredCacheIterator, storeIterator, serdes);
     }
 
-
+    @Override
     public void remove(final Windowed<K> sessionKey) {
         validateStoreOpen();
         put(sessionKey, null);
     }
 
+    @Override
     public void put(final Windowed<K> key, AGG value) {
         validateStoreOpen();
         final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer());
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
-        cache.put(cacheName, binaryKey.get(), entry);
+        cache.put(cacheName, binaryKey, entry);
     }
 
     @Override
@@ -92,34 +119,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         return findSessions(key, 0, Long.MAX_VALUE);
     }
 
-    @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context, final StateStore root) {
-        bytesStore.init(context, root);
-        initInternal((InternalProcessorContext) context);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void initInternal(final InternalProcessorContext context) {
-        this.context = context;
-
-        this.serdes = new StateSerdes<>(bytesStore.name(),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
-
-
-        this.cacheName = context.taskId() + "-" + bytesStore.name();
-        this.cache = this.context.getCache();
-        cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> entries) {
-                for (ThreadCache.DirtyEntry entry : entries) {
-                    putAndMaybeForward(entry, context);
-                }
-            }
-        });
-
-    }
-
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
         final Bytes binaryKey = entry.key();
         final RecordContext current = context.recordContext();
@@ -148,7 +147,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         }
     }
 
-
     public void flush() {
         cache.flush(cacheName);
         bytesStore.flush();
@@ -156,8 +154,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
 
     public void close() {
         flush();
-        bytesStore.close();
         cache.close(cacheName);
+        bytesStore.close();
     }
 
     public void setFlushListener(CacheFlushListener<Windowed<K>, AGG> flushListener) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index d471761..3cc8478 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -19,30 +19,31 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.List;
 
-class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
+class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V>, CachedStateStore<Windowed<K>, V> {
 
     private final WindowStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
-    private CacheFlushListener<Windowed<K>, V> flushListener;
     private final long windowSize;
+
     private String name;
     private ThreadCache cache;
     private InternalProcessorContext context;
     private StateSerdes<K, V> serdes;
+    private CacheFlushListener<Windowed<K>, V> flushListener;
 
     CachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
                        final Serde<K> keySerde,
@@ -63,7 +64,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
     }
 
     @SuppressWarnings("unchecked")
-    void initInternal(final ProcessorContext context) {
+    private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         this.serdes = new StateSerdes<>(underlying.name(),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
@@ -71,6 +72,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
 
         this.name = context.taskId() + "-" + underlying.name();
         this.cache = this.context.getCache();
+
         cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {
@@ -79,14 +81,13 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
                     final long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryWindowKey);
 
                     final Windowed<K> windowedKey = new Windowed<>(WindowStoreUtils.keyFromBinaryKey(binaryWindowKey, serdes),
-                                                                   new TimeWindow(timestamp, timestamp + windowSize));
+                            new TimeWindow(timestamp, timestamp + windowSize));
                     final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(binaryWindowKey);
                     maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
                     underlying.put(key, entry.newValue(), timestamp);
                 }
             }
         });
-
     }
 
     private void maybeForward(final ThreadCache.DirtyEntry entry,
@@ -102,7 +103,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
             } finally {
                 context.setRecordContext(current);
             }
-
         }
     }
 
@@ -119,8 +119,8 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
     @Override
     public void close() {
         flush();
-        underlying.close();
         cache.close(name);
+        underlying.close();
     }
 
     @Override
@@ -128,30 +128,34 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
         put(key, value, context.timestamp());
     }
 
-
     @Override
     public synchronized void put(final K key, final V value, final long timestamp) {
+        // since this function may not access the underlying inner store, we need to validate
+        // if store is open outside as well.
         validateStoreOpen();
-        final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
+
+        final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       timestamp, context.partition(), context.topic());
-        cache.put(name, binaryKey, entry);
+        cache.put(name, keyBytes, entry);
     }
 
     @Override
     public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        // since this function may not access the underlying inner store, we need to validate
+        // if store is open outside as well.
         validateStoreOpen();
-        byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
-        byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
+
+        Bytes fromBytes = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
+        Bytes toBytes = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
 
         final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, binaryFrom, binaryTo);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, fromBytes, toBytes);
         return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
                                                           underlyingIterator,
                                                           new StateSerdes<>(serdes.stateName(), Serdes.Long(), serdes.valueSerde()));
     }
 
-
     private V fetchPrevious(final Bytes key, final long timestamp) {
         try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, timestamp, timestamp)) {
             if (!iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index e31d04b..4f3bf48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -25,11 +25,11 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.List;
 
-public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<Bytes, byte[]> {
+public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]> {
     private final KeyValueStore<Bytes, byte[]> inner;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
-    public ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
+    ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
         super(inner);
         this.inner = inner;
     }
@@ -40,6 +40,10 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractW
         this.changeLogger = new StoreChangeLogger<>(inner.name(), context, WindowStoreUtils.INNER_SERDES);
     }
 
+    @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
 
     @Override
     public void put(final Bytes key, final byte[] value) {
@@ -85,9 +89,4 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractW
     public KeyValueIterator<Bytes, byte[]> all() {
         return inner.all();
     }
-
-    @Override
-    public long approximateNumEntries() {
-        return inner.approximateNumEntries();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
index cd63d1a..022803a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
@@ -28,12 +28,13 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.ArrayList;
 import java.util.List;
 
-class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> {
+class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
     private final ChangeLoggingKeyValueBytesStore innerBytes;
     private final Serde keySerde;
     private final Serde valueSerde;
     private StateSerdes<K, V> serdes;
 
+
     ChangeLoggingKeyValueStore(final KeyValueStore<Bytes, byte[]> bytesStore,
                                final Serde keySerde,
                                final Serde valueSerde) {
@@ -60,6 +61,11 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrapped
     }
 
     @Override
+    public long approximateNumEntries() {
+        return innerBytes.approximateNumEntries();
+    }
+
+    @Override
     public void put(final K key, final V value) {
         final Bytes bytesKey = Bytes.wrap(serdes.rawKey(key));
         final byte[] bytesValue = serdes.rawValue(value);
@@ -106,17 +112,11 @@ class ChangeLoggingKeyValueStore<K, V> extends WrappedStateStore.AbstractWrapped
     public KeyValueIterator<K, V> range(final K from, final K to) {
         return new SerializedKeyValueIterator<>(innerBytes.range(Bytes.wrap(serdes.rawKey(from)),
                                                                  Bytes.wrap(serdes.rawKey(to))),
-                                                serdes);
+                                                                 serdes);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
         return new SerializedKeyValueIterator<>(innerBytes.all(), serdes);
     }
-
-    @Override
-    public long approximateNumEntries() {
-        return innerBytes.approximateNumEntries();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 21c2866..0849378 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -25,12 +25,11 @@ import org.apache.kafka.streams.state.KeyValueIterator;
  * Simple wrapper around a {@link SegmentedBytesStore} to support writing
  * updates to a changelog
  */
-class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore {
+class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateStore implements SegmentedBytesStore {
 
     private final SegmentedBytesStore bytesStore;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
 
-
     ChangeLoggingSegmentedBytesStore(final SegmentedBytesStore bytesStore) {
         super(bytesStore);
         this.bytesStore = bytesStore;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index e0f1ec8..3ffbbb6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -56,6 +56,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                 throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
             }
         }
+
         return new WindowStoreIterator<V>() {
             @Override
             public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index f3101b1..af1fcdd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -22,10 +22,14 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.NoSuchElementException;
 
-public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
-    private final String storeName;
+/**
+ * Optimized {@link KeyValueIterator} used when the same element could be peeked multiple times.
+ */
+class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
     private final KeyValueIterator<K, V> underlying;
+    private final String storeName;
     private KeyValue<K, V> next;
+
     private volatile boolean open = true;
 
     public DelegatingPeekingKeyValueIterator(final String storeName, final KeyValueIterator<K, V> underlying) {
@@ -76,7 +80,7 @@ public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator
 
     @Override
     public void remove() {
-        throw new UnsupportedOperationException("remove not supported");
+        throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index d81f6fb..c92a1b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -27,40 +27,34 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
-public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
+public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
 
     private final KeyValueStore<K, V> inner;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
-    private final String storeName;
 
     private StoreChangeLogger<K, V> changeLogger;
     private ProcessorContext context;
 
-    public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
-        this.storeName = storeName;
+    InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) {
+        super(inner);
         this.inner = inner;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
     }
 
     @Override
-    public String name() {
-        return this.storeName;
-    }
-
-    @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         this.context = context;
         inner.init(context, root);
 
         // construct the serde
-        StateSerdes<K, V>  serdes = new StateSerdes<>(storeName,
+        StateSerdes<K, V>  serdes = new StateSerdes<>(inner.name(),
                 keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                 valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-        this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
+        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes);
 
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
@@ -75,13 +69,8 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return inner.isOpen();
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
     }
 
     @Override
@@ -143,19 +132,4 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     public KeyValueIterator<K, V> all() {
         return this.inner.all();
     }
-
-    @Override
-    public long approximateNumEntries() {
-        return this.inner.approximateNumEntries();
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
-    public void flush() {
-        this.inner.flush();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
new file mode 100644
index 0000000..fe50152
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+
+public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+    private final String name;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final NavigableMap<K, V> map;
+    private volatile boolean open = false;
+
+    private StateSerdes<K, V> serdes;
+
+    InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+
+        // TODO: when we have serde associated with class types, we can
+        // improve this situation by passing the comparator here.
+        this.map = new TreeMap<>();
+    }
+
+    public KeyValueStore<K, V> enableLogging() {
+        return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context, StateStore root) {
+        // construct the serde
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        if (root != null) {
+            // register the store
+            context.register(root, true, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    // check value for null, to avoid  deserialization error.
+                    if (value == null) {
+                        put(serdes.keyFrom(key), null);
+                    } else {
+                        put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    }
+                }
+            });
+        }
+
+        this.open = true;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public synchronized V get(K key) {
+        return this.map.get(key);
+    }
+
+    @Override
+    public synchronized void put(K key, V value) {
+        this.map.put(key, value);
+    }
+
+    @Override
+    public synchronized V putIfAbsent(K key, V value) {
+        V originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
+    @Override
+    public synchronized void putAll(List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries)
+            put(entry.key, entry.value);
+    }
+
+    @Override
+    public synchronized V delete(K key) {
+        return this.map.remove(key);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, V> range(K from, K to) {
+        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<K, V> all() {
+        final TreeMap<K, V> copy = new TreeMap<>(this.map);
+        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return this.map.size();
+    }
+
+    @Override
+    public void flush() {
+        // do-nothing since it is in-memory
+    }
+
+    @Override
+    public void close() {
+        this.map.clear();
+        this.open = false;
+    }
+
+    private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
+        private final Iterator<Map.Entry<K, V>> iter;
+
+        private InMemoryKeyValueIterator(Iterator<Map.Entry<K, V>> iter) {
+            this.iter = iter;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            Map.Entry<K, V> entry = iter.next();
+            return new KeyValue<>(entry.getKey(), entry.getValue());
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index e00f8ab..4be2c4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -19,19 +19,9 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 
 /**
  * An in-memory key-value store based on a TreeMap.
@@ -47,7 +37,6 @@ import java.util.TreeMap;
  */
 public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
-
     public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {
         this(name, keySerde, valueSerde, null, logged, logConfig);
     }
@@ -57,160 +46,8 @@ public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K
     }
 
     public KeyValueStore get() {
-        MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
+        InMemoryKeyValueStore<K, V> store = new InMemoryKeyValueStore<>(name, keySerde, valueSerde);
 
         return new MeteredKeyValueStore<>(logged ? store.enableLogging() : store, "in-memory-state", time);
     }
-
-    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-        private final String name;
-        private final Serde<K> keySerde;
-        private final Serde<V> valueSerde;
-        private final NavigableMap<K, V> map;
-        private volatile boolean open = false;
-
-        private StateSerdes<K, V> serdes;
-
-        public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
-            this.name = name;
-            this.keySerde = keySerde;
-            this.valueSerde = valueSerde;
-
-            // TODO: when we have serde associated with class types, we can
-            // improve this situation by passing the comparator here.
-            this.map = new TreeMap<>();
-        }
-
-        public KeyValueStore<K, V> enableLogging() {
-            return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void init(ProcessorContext context, StateStore root) {
-            // construct the serde
-            this.serdes = new StateSerdes<>(name,
-                    keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                    valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
-            // register the store
-            context.register(root, true, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    // check value for null, to avoid  deserialization error.
-                    if (value == null) {
-                        put(serdes.keyFrom(key), null);
-                    } else {
-                        put(serdes.keyFrom(key), serdes.valueFrom(value));
-                    }
-                }
-            });
-            this.open = true;
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public boolean isOpen() {
-            return this.open;
-        }
-
-        @Override
-        public synchronized V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public synchronized void put(K key, V value) {
-            this.map.put(key, value);
-        }
-
-        @Override
-        public synchronized V putIfAbsent(K key, V value) {
-            V originalValue = get(key);
-            if (originalValue == null) {
-                put(key, value);
-            }
-            return originalValue;
-        }
-
-        @Override
-        public synchronized void putAll(List<KeyValue<K, V>> entries) {
-            for (KeyValue<K, V> entry : entries)
-                put(entry.key, entry.value);
-        }
-
-        @Override
-        public synchronized V delete(K key) {
-            return this.map.remove(key);
-        }
-
-        @Override
-        public synchronized KeyValueIterator<K, V> range(K from, K to) {
-            return new DelegatingPeekingKeyValueIterator<>(name, new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator()));
-        }
-
-        @Override
-        public synchronized KeyValueIterator<K, V> all() {
-            final TreeMap<K, V> copy = new TreeMap<>(this.map);
-            return new DelegatingPeekingKeyValueIterator<>(name, new MemoryStoreIterator<>(copy.entrySet().iterator()));
-        }
-
-        @Override
-        public long approximateNumEntries() {
-            return this.map.size();
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            this.open = false;
-        }
-
-        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<Map.Entry<K, V>> iter;
-
-            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
-                this.iter = iter;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public KeyValue<K, V> next() {
-                Map.Entry<K, V> entry = iter.next();
-                return new KeyValue<>(entry.getKey(), entry.getValue());
-            }
-
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-
-            @Override
-            public void close() {
-            }
-
-            @Override
-            public K peekNextKey() {
-                throw new UnsupportedOperationException("peekNextKey not supported on MemoryStoreIterator");
-            }
-
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index b7e47e2..e20f409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -25,12 +25,12 @@ class LRUCacheEntry implements RecordContext {
 
     public final byte[] value;
     private final long offset;
-    private final long timestamp;
     private final String topic;
-    boolean isDirty;
     private final int partition;
-    private long sizeBytes = 0;
+    private final long timestamp;
 
+    private long sizeBytes;
+    private boolean isDirty;
 
     LRUCacheEntry(final byte[] value) {
         this(value, false, -1, -1, -1, "");
@@ -51,13 +51,6 @@ class LRUCacheEntry implements RecordContext {
                 8 + // offset
                 4 + // partition
                 (topic == null ? 0 : topic.length());
-
-    }
-
-
-
-    void markClean() {
-        isDirty = false;
     }
 
     @Override
@@ -80,6 +73,10 @@ class LRUCacheEntry implements RecordContext {
         return partition;
     }
 
+    void markClean() {
+        isDirty = false;
+    }
+
     boolean isDirty() {
         return isDirty;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 4e1f40e..c1eb689 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -85,7 +85,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     public KeyValueStore<K, V> enableLogging() {
-        return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
+        return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
     }
 
     public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index b860e16..91673da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -36,7 +36,7 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedC
     }
 
     @Override
-    public KeyValue<K, V> deserializeStorePair(KeyValue<Bytes, byte[]> pair) {
+    public KeyValue<K, V> deserializeStorePair(final KeyValue<Bytes, byte[]> pair) {
         return KeyValue.pair(serdes.keyFrom(pair.key.get()), serdes.valueFrom(pair.value));
     }
 
@@ -46,12 +46,12 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedC
     }
 
     @Override
-    public K deserializeStoreKey(Bytes key) {
+    public K deserializeStoreKey(final Bytes key) {
         return serdes.keyFrom(key.get());
     }
 
     @Override
-    public int compare(Bytes cacheKey, Bytes storeKey) {
+    public int compare(final Bytes cacheKey, final Bytes storeKey) {
         return cacheKey.compareTo(storeKey);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index db64621..db7e898 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -59,7 +59,7 @@ class MergedSortedCacheSessionStoreIterator<K, AGG> extends AbstractMergedSorted
     @Override
     public Windowed<K> deserializeStoreKey(Windowed<Bytes> key) {
         final K originalKey = rawSerdes.keyFrom(key.key().get());
-        return new Windowed<K>(originalKey, key.window());
+        return new Windowed<>(originalKey, key.window());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index dc6b7a5..0a72759 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -29,16 +29,16 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import java.util.List;
 
 /**
- * Metered KeyValueStore wrapper is used for recording operation metrics, and hence its
+ * Metered {@link KeyValueStore} wrapper is used for recording operation metrics, and hence its
  * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
  *
  * @param <K>
  * @param <V>
  */
-public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappedStateStore implements KeyValueStore<K, V> {
+public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
 
-    protected final KeyValueStore<K, V> inner;
-    protected final String metricScope;
+    private final KeyValueStore<K, V> inner;
+    private final String metricScope;
     protected final Time time;
 
     private Sensor putTime;
@@ -132,6 +132,11 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappe
     }
 
     @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
+
+    @Override
     public V get(K key) {
         this.key = key;
         metrics.measureLatencyNs(time, getDelegate, this.getTime);
@@ -177,11 +182,6 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappe
     }
 
     @Override
-    public long approximateNumEntries() {
-        return this.inner.approximateNumEntries();
-    }
-
-    @Override
     public void flush() {
         metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
     }
@@ -192,7 +192,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractWrappe
         private final Sensor sensor;
         private final long startNs;
 
-        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
+        MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
             this.iter = iter;
             this.sensor = sensor;
             this.startNs = time.nanoseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
index 4d3771e..3bc1bf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
-class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateStore implements SegmentedBytesStore {
+class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore implements SegmentedBytesStore {
 
     private final SegmentedBytesStore inner;
     private final String metricScope;
@@ -154,7 +154,5 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractWrappedStateS
         public Bytes peekNextKey() {
             return iter.peekNextKey();
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 229ccec..931ab17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -152,7 +152,7 @@ class NamedCache {
 
 
     synchronized void put(final Bytes key, final LRUCacheEntry value) {
-        if (!value.isDirty && dirtyKeys.contains(key)) {
+        if (!value.isDirty() && dirtyKeys.contains(key)) {
             throw new IllegalStateException(String.format("Attempting to put a clean entry for key [%s] " +
                                                                   "into NamedCache [%s] when it already contains " +
                                                                   "a dirty entry for the same key",
@@ -308,10 +308,6 @@ class NamedCache {
         return tail;
     }
 
-    synchronized long dirtySize() {
-        return dirtyKeys.size();
-    }
-
     synchronized void close() {
         head = tail = null;
         listener = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 31956ba..ecd4088 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -32,7 +32,6 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private ProcessorContext context;
     private volatile boolean open;
 
-
     RocksDBSegmentedBytesStore(final String name,
                                final long retention,
                                final int numSegments,
@@ -42,19 +41,16 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
         this.segments = new Segments(name, retention, numSegments);
     }
 
-
     @Override
-    @SuppressWarnings("unchecked")
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
         final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
 
         final Bytes binaryFrom = keySchema.lowerRange(key, from);
         final Bytes binaryTo = keySchema.upperRange(key, to);
 
-        return new SegmentIterator(
-                searchSpace.iterator(),
-                keySchema.hasNextCondition(key, from, to),
-                binaryFrom, binaryTo);
+        return new SegmentIterator(searchSpace.iterator(),
+                                   keySchema.hasNextCondition(key, from, to),
+                                   binaryFrom, binaryTo);
     }
 
     @Override
@@ -75,7 +71,6 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
         }
     }
 
-
     @Override
     public byte[] get(final Bytes key) {
         final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
@@ -91,7 +86,6 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
         this.context = context;
 
@@ -129,6 +123,4 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public boolean isOpen() {
         return open;
     }
-
-
 }


Mime
View raw message