kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/2] kafka git commit: KAFKA-5673; refactor KeyValueStore hierarchy to make MeteredKeyValueStore outermost
Date Mon, 14 Aug 2017 09:02:33 GMT
KAFKA-5673; refactor KeyValueStore hierarchy to make MeteredKeyValueStore outermost

refactor StateStoreSuppliers such that a `MeteredKeyValueStore`  is the outermost store.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3592 from dguy/key-value-store-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e19c37e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e19c37e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e19c37e5

Branch: refs/heads/trunk
Commit: e19c37e5914b1fa64befdfdf24edbf2415f8cdb3
Parents: f4dc5ac
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Aug 14 10:02:32 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Aug 14 10:02:32 2017 +0100

----------------------------------------------------------------------
 .../kstream/internals/TupleForwarder.java       |  21 +-
 .../state/internals/CachingKeyValueStore.java   |  59 ++--
 .../internals/InnerMeteredKeyValueStore.java    | 323 +++++++++++++++++++
 ...edSortedCacheKeyValueBytesStoreIterator.java |  59 ++++
 .../MergedSortedCacheKeyValueStoreIterator.java |  65 ----
 .../internals/MeteredKeyValueBytesStore.java    | 154 +++++++++
 .../state/internals/MeteredKeyValueStore.java   | 216 +++----------
 .../internals/RocksDBKeyValueStoreSupplier.java |  35 +-
 .../state/internals/WrappedStateStore.java      |  15 +-
 .../internals/CachingKeyValueStoreTest.java     | 115 +++----
 ...rtedCacheKeyValueBytesStoreIteratorTest.java | 182 +++++++++++
 ...gedSortedCacheKeyValueStoreIteratorTest.java | 182 -----------
 .../MeteredKeyValueBytesStoreTest.java          | 202 ++++++++++++
 .../RocksDBKeyValueStoreSupplierTest.java       |  10 +-
 14 files changed, 1094 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index 7eb9c56..f07d7bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 /**
  * This class is used to determine if a processor should forward values to child nodes.
@@ -28,7 +29,7 @@ import org.apache.kafka.streams.state.internals.CachedStateStore;
  * @param <V>
  */
 class TupleForwarder<K, V> {
-    private final boolean cached;
+    private final CachedStateStore cachedStateStore;
     private final ProcessorContext context;
     private final boolean sendOldValues;
 
@@ -37,18 +38,28 @@ class TupleForwarder<K, V> {
                    final ProcessorContext context,
                    final ForwardingCacheFlushListener flushListener,
                    final boolean sendOldValues) {
-        this.cached = store instanceof CachedStateStore;
+        this.cachedStateStore = cachedStateStore(store);
         this.context = context;
         this.sendOldValues = sendOldValues;
-        if (this.cached) {
-            ((CachedStateStore) store).setFlushListener(flushListener);
+        if (this.cachedStateStore != null) {
+            cachedStateStore.setFlushListener(flushListener);
         }
     }
 
+    private CachedStateStore cachedStateStore(final StateStore store) {
+        if (store instanceof CachedStateStore) {
+            return (CachedStateStore) store;
+        } else if (store instanceof WrappedStateStore
+                && ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) {
+            return (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
+        }
+        return null;
+    }
+
     public void maybeForward(final K key,
                              final V newValue,
                              final V oldValue) {
-        if (!cached) {
+        if (cachedStateStore == null) {
             if (sendOldValues) {
                 context.forward(key, new Change<>(newValue, oldValue));
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index e147ea8..a89c741 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.List;
 import java.util.Objects;
 
-class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V>, CachedStateStore<K, V> {
+class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> {
 
     private final KeyValueStore<Bytes, byte[]> underlying;
     private final Serde<K> keySerde;
@@ -126,16 +126,13 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public synchronized V get(final K key) {
+    public synchronized byte[] get(final Bytes key) {
         validateStoreOpen();
         Objects.requireNonNull(key);
-
-        final byte[] rawKey = serdes.rawKey(key);
-        return get(rawKey);
+        return getInternal(key);
     }
 
-    private V get(final byte[] rawKey) {
-        final Bytes key = Bytes.wrap(rawKey);
+    private byte[] getInternal(final Bytes key) {
         final LRUCacheEntry entry = cache.get(cacheName, key);
         if (entry == null) {
             final byte[] rawValue = underlying.get(key);
@@ -147,32 +144,30 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
             if (Thread.currentThread().equals(streamThread)) {
                 cache.put(cacheName, key, new LRUCacheEntry(rawValue));
             }
-            return serdes.valueFrom(rawValue);
+            return rawValue;
         }
 
         if (entry.value == null) {
             return null;
         }
 
-        return serdes.valueFrom(entry.value);
+        return entry.value;
     }
 
     @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
         validateStoreOpen();
-        final Bytes origFrom = Bytes.wrap(serdes.rawKey(from));
-        final Bytes origTo = Bytes.wrap(serdes.rawKey(to));
-        final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(origFrom, origTo);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, origFrom, origTo);
-        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(from, to);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to);
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
     }
 
     @Override
-    public KeyValueIterator<K, V> all() {
+    public KeyValueIterator<Bytes, byte[]> all() {
         validateStoreOpen();
         final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(this.name(), underlying.all());
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(cacheName);
-        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
     }
 
     @Override
@@ -182,47 +177,43 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public synchronized void put(final K key, final V value) {
+    public synchronized void put(final Bytes key, final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        put(serdes.rawKey(key), value);
+        putInternal(key, value);
     }
 
-    private synchronized void put(final byte[] rawKey, final V value) {
+    private synchronized void putInternal(final Bytes rawKey, final byte[] value) {
         Objects.requireNonNull(rawKey, "key cannot be null");
-        final byte[] rawValue = serdes.rawValue(value);
-        cache.put(cacheName, Bytes.wrap(rawKey), new LRUCacheEntry(rawValue, true, context.offset(),
+        cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(),
                   context.timestamp(), context.partition(), context.topic()));
     }
 
     @Override
-    public synchronized V putIfAbsent(final K key, final V value) {
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        final byte[] rawKey = serdes.rawKey(key);
-        final V v = get(rawKey);
+        final byte[] v = getInternal(key);
         if (v == null) {
-            put(rawKey, value);
+            putInternal(key, value);
         }
         return v;
     }
 
     @Override
-    public synchronized void putAll(final List<KeyValue<K, V>> entries) {
-        for (KeyValue<K, V> entry : entries) {
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (KeyValue<Bytes, byte[]> entry : entries) {
             put(entry.key, entry.value);
         }
     }
 
     @Override
-    public synchronized V delete(final K key) {
+    public synchronized byte[] delete(final Bytes key) {
         validateStoreOpen();
         Objects.requireNonNull(key);
-        final byte[] rawKey = serdes.rawKey(key);
-        final Bytes bytesKey = Bytes.wrap(rawKey);
-        final V v = get(rawKey);
-        cache.delete(cacheName, bytesKey);
-        underlying.delete(bytesKey);
+        final byte[] v = getInternal(key);
+        cache.delete(cacheName, key);
+        underlying.delete(key);
         return v;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
new file mode 100644
index 0000000..5ff8a26
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+/**
+ * Metered {@link KeyValueStore} wrapper is used for recording operation metrics, and hence its
+ * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
+ *
+ * @param <K>
+ * @param <V>
+ */
+class InnerMeteredKeyValueStore<K, IK, V, IV> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
+
+    private final KeyValueStore<IK, IV> inner;
+    private final String metricScope;
+    // convert types from outer store type to inner store type
+    private final TypeConverter<K, IK, V, IV> typeConverter;
+    protected final Time time;
+    private Sensor putTime;
+    private Sensor putIfAbsentTime;
+    private Sensor getTime;
+    private Sensor deleteTime;
+    private Sensor putAllTime;
+    private Sensor allTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private StreamsMetrics metrics;
+    private ProcessorContext context;
+    private StateStore root;
+
+    /**
+     * For a period of time we will have 2 store hierarchies. 1 which is built by a
+     * {@link org.apache.kafka.streams.processor.StateStoreSupplier} where the outer most store will be of user defined
+     * type, i.e, &lt;String,Integer&gt;, and another where the outermost store will be of type &lt;Bytes,byte[]&gt;
+     * This interface is so we don't need to have 2 complete implementations for collecting the metrics, rather
+     * we just provide an instance of this to do the type conversions from the outer store types to the inner store types.
+     * @param <K>  key type of the outer store
+     * @param <IK> key type of the inner store
+     * @param <V>  value type of the outer store
+     * @param <IV> value type of the inner store
+     */
+    interface TypeConverter<K, IK, V, IV> {
+        IK innerKey(final K key);
+        IV innerValue(final V value);
+        List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);
+        V outerValue(final IV value);
+        KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
+        K outerKey(final IK ik);
+    }
+
+    // always wrap the store with the metered store
+    InnerMeteredKeyValueStore(final KeyValueStore<IK, IV> inner,
+                                     final String metricScope,
+                                     final TypeConverter<K, IK, V, IV> typeConverter,
+                                     final Time time) {
+        super(inner);
+        this.inner = inner;
+        this.metricScope = metricScope;
+        this.typeConverter = typeConverter;
+        this.time = time != null ? time : Time.SYSTEM;
+    }
+
+    @Override
+    public void init(ProcessorContext context, StateStore root) {
+        final String name = name();
+        final String tagKey = "task-id";
+        final String tagValue = context.taskId().toString();
+        this.context = context;
+        this.root = root;
+        this.metrics = context.metrics();
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  name,
+                                                                  "put",
+                                                                  Sensor.RecordingLevel.DEBUG,
+                                                                  tagKey, tagValue);
+        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                          name,
+                                                                          "put-if-absent",
+                                                                          Sensor.RecordingLevel.DEBUG,
+                                                                          tagKey,
+                                                                          tagValue);
+        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  name,
+                                                                  "get",
+                                                                  Sensor.RecordingLevel.DEBUG,
+                                                                  tagKey,
+                                                                  tagValue);
+        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                     name,
+                                                                     "delete",
+                                                                     Sensor.RecordingLevel.DEBUG,
+                                                                     tagKey,
+                                                                     tagValue);
+        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                     name,
+                                                                     "put-all",
+                                                                     Sensor.RecordingLevel.DEBUG,
+                                                                     tagKey,
+                                                                     tagValue);
+        this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                  name,
+                                                                  "all",
+                                                                  Sensor.RecordingLevel.DEBUG,
+                                                                  tagKey,
+                                                                  tagValue);
+        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                    name,
+                                                                    "range",
+                                                                    Sensor.RecordingLevel.DEBUG,
+                                                                    tagKey,
+                                                                    tagValue);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                    name,
+                                                                    "flush",
+                                                                    Sensor.RecordingLevel.DEBUG,
+                                                                    tagKey,
+                                                                    tagValue);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope,
+                                                                              name,
+                                                                              "restore",
+                                                                              Sensor.RecordingLevel.DEBUG,
+                                                                              tagKey,
+                                                                              tagValue);
+
+        // register and possibly restore the state from the logs
+        if (restoreTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.init(InnerMeteredKeyValueStore.this.context, InnerMeteredKeyValueStore.this.root);
+                    return null;
+                }
+            }, restoreTime);
+        } else {
+            inner.init(InnerMeteredKeyValueStore.this.context, InnerMeteredKeyValueStore.this.root);
+        }
+
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return inner.approximateNumEntries();
+    }
+
+    private interface Action<V> {
+        V execute();
+    }
+
+    @Override
+    public V get(final K key) {
+        if (getTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+                }
+            }, getTime);
+        } else {
+            return typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+        }
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        if (putTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+                    return null;
+                }
+            }, putTime);
+        } else {
+            inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value));
+        }
+    }
+
+    @Override
+    public V putIfAbsent(final K key, final V value) {
+        if (putIfAbsentTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return typeConverter.outerValue(inner.putIfAbsent(typeConverter.innerKey(key), typeConverter.innerValue(value)));
+                }
+            }, putIfAbsentTime);
+        } else {
+            return typeConverter.outerValue(inner.putIfAbsent(typeConverter.innerKey(key), typeConverter.innerValue(value)));
+        }
+
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        if (putAllTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.putAll(typeConverter.innerEntries(entries));
+                    return null;
+                }
+            }, putAllTime);
+        } else {
+            inner.putAll(typeConverter.innerEntries(entries));
+        }
+    }
+
+    @Override
+    public V delete(final K key) {
+        if (deleteTime.shouldRecord()) {
+            return measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+                }
+            }, deleteTime);
+        } else {
+            return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+        }
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new MeteredKeyValueIterator(this.inner.range(typeConverter.innerKey(from), typeConverter.innerKey(to)), this.rangeTime);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
+    }
+
+    @Override
+    public void flush() {
+        if (flushTime.shouldRecord()) {
+            measureLatency(new Action<V>() {
+                @Override
+                public V execute() {
+                    inner.flush();
+                    return null;
+                }
+            }, flushTime);
+        } else {
+            inner.flush();
+        }
+
+    }
+
+    private V measureLatency(final Action<V> action, final Sensor sensor) {
+        final long startNs = time.nanoseconds();
+        try {
+            return action.execute();
+        } finally {
+            metrics.recordLatency(sensor, startNs, time.nanoseconds());
+        }
+    }
+
+    private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
+
+        private final KeyValueIterator<IK, IV> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        MeteredKeyValueIterator(KeyValueIterator<IK, IV> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            return typeConverter.outerKeyValue(iter.next());
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+        @Override
+        public K peekNextKey() {
+            return typeConverter.outerKey(iter.peekNextKey());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
new file mode 100644
index 0000000..8f9d152
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * Merges two iterators. Assumes each of them is sorted by key
+ *
+ */
+class MergedSortedCacheKeyValueBytesStoreIterator extends AbstractMergedSortedCacheStoreIterator<Bytes, Bytes, byte[], byte[]> {
+
+
+    MergedSortedCacheKeyValueBytesStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                                                final KeyValueIterator<Bytes, byte[]> storeIterator) {
+        super(cacheIterator, storeIterator);
+    }
+
+    @Override
+    public KeyValue<Bytes, byte[]> deserializeStorePair(final KeyValue<Bytes, byte[]> pair) {
+        return pair;
+    }
+
+    @Override
+    Bytes deserializeCacheKey(final Bytes cacheKey) {
+        return cacheKey;
+    }
+
+    @Override
+    byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
+        return cacheEntry.value;
+    }
+
+    @Override
+    public Bytes deserializeStoreKey(final Bytes key) {
+        return key;
+    }
+
+    @Override
+    public int compare(final Bytes cacheKey, final Bytes storeKey) {
+        return cacheKey.compareTo(storeKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
deleted file mode 100644
index f7bb2ee..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-
-/**
- * Merges two iterators. Assumes each of them is sorted by key
- *
- * @param <K>
- * @param <V>
- */
-class MergedSortedCacheKeyValueStoreIterator<K, V> extends AbstractMergedSortedCacheStoreIterator<K, Bytes, V, byte[]> {
-
-    private final StateSerdes<K, V> serdes;
-
-    MergedSortedCacheKeyValueStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
-                                           final KeyValueIterator<Bytes, byte[]> storeIterator,
-                                           final StateSerdes<K, V> serdes) {
-        super(cacheIterator, storeIterator);
-        this.serdes = serdes;
-    }
-
-    @Override
-    public KeyValue<K, V> deserializeStorePair(final KeyValue<Bytes, byte[]> pair) {
-        return KeyValue.pair(serdes.keyFrom(pair.key.get()), serdes.valueFrom(pair.value));
-    }
-
-    @Override
-    K deserializeCacheKey(final Bytes cacheKey) {
-        return serdes.keyFrom(cacheKey.get());
-    }
-
-    @Override
-    V deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return serdes.valueFrom(cacheEntry.value);
-    }
-
-    @Override
-    public K deserializeStoreKey(final Bytes key) {
-        return serdes.keyFrom(key.get());
-    }
-
-    @Override
-    public int compare(final Bytes cacheKey, final Bytes storeKey) {
-        return cacheKey.compareTo(storeKey);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
new file mode 100644
index 0000000..7ac8bab
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
+ * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
+ * to convert from &lt;K,V&gt; to &lt;Bytes,byte[]&gt;
+ * @param <K>
+ * @param <V>
+ */
+public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private StateSerdes<K, V> serdes;
+    private final InnerMeteredKeyValueStore<K, Bytes, V, byte[]> innerMetered;
+
+    // always wrap the store with the metered store
+    public MeteredKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner,
+                                     final String metricScope,
+                                     final Time time,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valueSerde) {
+        super(inner);
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        innerMetered = new InnerMeteredKeyValueStore<>(inner, metricScope, new InnerMeteredKeyValueStore.TypeConverter<K, Bytes, V, byte[]>() {
+            @Override
+            public Bytes innerKey(final K key) {
+                return Bytes.wrap(serdes.rawKey(key));
+            }
+
+            @Override
+            public byte[] innerValue(final V value) {
+                if (value == null) {
+                    return null;
+                }
+                return serdes.rawValue(value);
+            }
+
+            @Override
+            public List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
+                final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+                for (KeyValue<K, V> entry : from) {
+                    byteEntries.add(KeyValue.pair(innerKey(entry.key), serdes.rawValue(entry.value)));
+
+                }
+                return byteEntries;
+            }
+
+            @Override
+            public V outerValue(final byte[] value) {
+                return serdes.valueFrom(value);
+            }
+
+            @Override
+            public KeyValue<K, V> outerKeyValue(final KeyValue<Bytes, byte[]> from) {
+                return KeyValue.pair(serdes.keyFrom(from.key.get()), serdes.valueFrom(from.value));
+            }
+
+            @Override
+            public K outerKey(final Bytes key) {
+                return serdes.keyFrom(key.get());
+            }
+        }, time);
+    }
+
+    @Override
+    public void init(ProcessorContext context, StateStore root) {
+        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        innerMetered.init(context, root);
+
+
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return innerMetered.approximateNumEntries();
+    }
+
+    @Override
+    public V get(final K key) {
+        return innerMetered.get(key);
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        innerMetered.put(key, value);
+    }
+
+    @Override
+    public V putIfAbsent(final K key, final V value) {
+        return innerMetered.putIfAbsent(key, value);
+    }
+
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        innerMetered.putAll(entries);
+    }
+
+    @Override
+    public V delete(final K key) {
+        return innerMetered.delete(key);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return innerMetered.range(from, to);
+    }
+
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return innerMetered.all();
+    }
+
+    @Override
+    public void flush() {
+        innerMetered.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 95cbedb..7d43165 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -36,230 +34,96 @@ import java.util.List;
  */
 public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
 
-    private final KeyValueStore<K, V> inner;
-    private final String metricScope;
     protected final Time time;
-    private Sensor putTime;
-    private Sensor putIfAbsentTime;
-    private Sensor getTime;
-    private Sensor deleteTime;
-    private Sensor putAllTime;
-    private Sensor allTime;
-    private Sensor rangeTime;
-    private Sensor flushTime;
-    private StreamsMetrics metrics;
-    private ProcessorContext context;
-    private StateStore root;
+    private final InnerMeteredKeyValueStore<K, K, V, V> innerMetered;
 
     // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner,
                                 final String metricScope,
                                 final Time time) {
         super(inner);
-        this.inner = inner;
-        this.metricScope = metricScope;
         this.time = time != null ? time : Time.SYSTEM;
-    }
+        this.innerMetered = new InnerMeteredKeyValueStore<>(inner, metricScope, new InnerMeteredKeyValueStore.TypeConverter<K, K, V, V>() {
+            @Override
+            public K innerKey(final K key) {
+                return key;
+            }
 
-    @Override
-    public void init(ProcessorContext context, StateStore root) {
-        final String tagKey = "task-id";
-        final String tagValue = context.taskId().toString();
+            @Override
+            public V innerValue(final V value) {
+                return value;
+            }
 
-        this.context = context;
-        this.root = root;
+            @Override
+            public List<KeyValue<K, V>> innerEntries(final List<KeyValue<K, V>> from) {
+                return from;
+            }
 
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-if-absent",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "delete",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-all",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "all",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "range",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
-                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            @Override
+            public V outerValue(final V value) {
+                return value;
+            }
 
-        // register and possibly restore the state from the logs
-        if (restoreTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
-                    return null;
-                }
-            }, restoreTime);
-        } else {
-            inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
-        }
+            @Override
+            public KeyValue<K, V> outerKeyValue(final KeyValue<K, V> from) {
+                return from;
+            }
 
+            @Override
+            public K outerKey(final K key) {
+                return key;
+            }
+        }, time);
     }
 
     @Override
-    public long approximateNumEntries() {
-        return inner.approximateNumEntries();
+    public void init(ProcessorContext context, StateStore root) {
+        innerMetered.init(context, root);
     }
 
-    interface Action<V> {
-        V execute();
+    @Override
+    public long approximateNumEntries() {
+        return innerMetered.approximateNumEntries();
     }
 
     @Override
     public V get(final K key) {
-        if (getTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return inner.get(key);
-                }
-            }, getTime);
-        } else {
-            return inner.get(key);
-        }
+        return innerMetered.get(key);
     }
 
     @Override
     public void put(final K key, final V value) {
-        if (putTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.put(key, value);
-                    return null;
-                }
-            }, putTime);
-        } else {
-            inner.put(key, value);
-        }
+        innerMetered.put(key, value);
     }
 
     @Override
     public V putIfAbsent(final K key, final V value) {
-        if (putIfAbsentTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return inner.putIfAbsent(key, value);
-                }
-            }, putIfAbsentTime);
-        } else {
-            return inner.putIfAbsent(key, value);
-        }
-
+        return innerMetered.putIfAbsent(key, value);
     }
 
     @Override
     public void putAll(final List<KeyValue<K, V>> entries) {
-        if (putAllTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.putAll(entries);
-                    return null;
-                }
-            }, putAllTime);
-        } else {
-            inner.putAll(entries);
-        }
+        innerMetered.putAll(entries);
     }
 
     @Override
     public V delete(final K key) {
-        if (deleteTime.shouldRecord()) {
-            return measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    return inner.delete(key);
-                }
-            }, deleteTime);
-        } else {
-            return inner.delete(key);
-        }
+        return innerMetered.delete(key);
     }
 
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
-        return new MeteredKeyValueIterator<>(this.inner.range(from, to), this.rangeTime);
+        return innerMetered.range(from, to);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator<>(this.inner.all(), this.allTime);
+        return innerMetered.all();
     }
 
     @Override
     public void flush() {
-        if (flushTime.shouldRecord()) {
-            measureLatency(new Action<V>() {
-                @Override
-                public V execute() {
-                    inner.flush();
-                    return null;
-                }
-            }, flushTime);
-        } else {
-            inner.flush();
-        }
-
-    }
-
-    private V measureLatency(final Action<V> action, final Sensor sensor) {
-        final long startNs = time.nanoseconds();
-        try {
-            return action.execute();
-        } finally {
-            metrics.recordLatency(sensor, startNs, time.nanoseconds());
-        }
+        innerMetered.flush();
     }
 
-    private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
-        private final KeyValueIterator<K1, V1> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<K1, V1> next() {
-            return iter.next();
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
-            }
-        }
-
-        @Override
-        public K1 peekNextKey() {
-            return iter.peekNextKey();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 82cf96d..60a506b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -47,38 +47,33 @@ public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K,
     }
 
     public KeyValueStore get() {
-        if (!cached && !logged) {
-            return new MeteredKeyValueStore<>(
-                    new RocksDBStore<>(name, keySerde, valueSerde), METRICS_SCOPE, time);
-        }
-
-        // when cached, logged, or both we use a bytes store as the inner most store
         final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
                                                                      Serdes.Bytes(),
                                                                      Serdes.ByteArray());
 
+        if (!cached && !logged) {
+            return new MeteredKeyValueBytesStore<>(
+                    rocks, METRICS_SCOPE, time, keySerde, valueSerde);
+        }
+
+
         if (cached && logged) {
-            return new CachingKeyValueStore<>(
-                    new MeteredKeyValueStore<>(
-                            new ChangeLoggingKeyValueBytesStore(rocks),
-                            METRICS_SCOPE,
-                            time),
-                    keySerde,
-                    valueSerde);
+            final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(new ChangeLoggingKeyValueBytesStore(rocks), keySerde, valueSerde);
+            return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
         }
 
         if (cached) {
-            return new CachingKeyValueStore<>(
-                    new MeteredKeyValueStore<>(rocks, METRICS_SCOPE, time),
-                    keySerde,
-                    valueSerde);
+            final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(rocks, keySerde, valueSerde);
+            return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
 
         } else {
             // logged
-            return new MeteredKeyValueStore<>(
-                    new ChangeLoggingKeyValueStore<>(rocks, keySerde, valueSerde),
+            return new MeteredKeyValueBytesStore<>(
+                    new ChangeLoggingKeyValueBytesStore(rocks),
                     METRICS_SCOPE,
-                    time);
+                    time,
+                    keySerde,
+                    valueSerde);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index b0721b1..66d2899 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -23,15 +23,21 @@ import org.apache.kafka.streams.processor.StateStore;
 /**
  * A storage engine wrapper for utilities like logging, caching, and metering.
  */
-interface WrappedStateStore extends StateStore {
+public interface WrappedStateStore extends StateStore {
 
     /**
-     * Return the inner storage engine
+     * Return the inner most storage engine
      *
      * @return wrapped inner storage engine
      */
     StateStore inner();
 
+    /**
+     * Return the state store this store directly wraps
+     * @return
+     */
+    StateStore wrappedStore();
+
     abstract class AbstractStateStore implements WrappedStateStore {
         final StateStore innerState;
 
@@ -82,5 +88,10 @@ interface WrappedStateStore extends StateStore {
         public void close() {
             innerState.close();
         }
+
+        @Override
+        public StateStore wrappedStore() {
+            return innerState;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 9110144..0831471 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -44,10 +43,12 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertTrue;
 
@@ -88,39 +89,43 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
                                                              final boolean useContextSerdes) {
         final String storeName = "cache-store";
 
-        final Stores.PersistentKeyValueFactory<?, ?> factory = Stores
+
+        final Stores.PersistentKeyValueFactory<K, V> factory = Stores
                 .create(storeName)
-                .withKeys(Serdes.Bytes())
-                .withValues(Serdes.ByteArray())
-                .persistent();
+                .withKeys(Serdes.serdeFrom(keyClass))
+                .withValues(Serdes.serdeFrom(valueClass))
+                .persistent()
+                .enableCaching();
 
 
-        final KeyValueStore<Bytes, byte[]> underlyingStore = (KeyValueStore<Bytes, byte[]>) factory.build().get();
+        final KeyValueStore<K, V> store = (KeyValueStore<K, V>) factory.build().get();
         final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>();
-        final CachingKeyValueStore<K, V> store;
-        if (useContextSerdes) {
-            store = new CachingKeyValueStore<>(underlyingStore,
-                (Serde<K>) context.keySerde(), (Serde<V>) context.valueSerde());
-        } else {
-            store = new CachingKeyValueStore<>(underlyingStore,
-                Serdes.serdeFrom(keyClass), Serdes.serdeFrom(valueClass));
-        }
-        store.setFlushListener(cacheFlushListener);
+
+        final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
+        inner.setFlushListener(cacheFlushListener);
         store.init(context, store);
         return store;
     }
 
     @Test
     public void shouldPutGetToFromCache() throws Exception {
-        store.put("key", "value");
-        store.put("key2", "value2");
-        assertEquals("value", store.get("key"));
-        assertEquals("value2", store.get("key2"));
+        store.put(bytesKey("key"), bytesValue("value"));
+        store.put(bytesKey("key2"), bytesValue("value2"));
+        assertThat(store.get(bytesKey("key")), equalTo(bytesValue("value")));
+        assertThat(store.get(bytesKey("key2")), equalTo(bytesValue("value2")));
         // nothing evicted so underlying store should be empty
         assertEquals(2, cache.size());
         assertEquals(0, underlyingStore.approximateNumEntries());
     }
 
+    private byte[] bytesValue(final String value) {
+        return value.getBytes();
+    }
+
+    private Bytes bytesKey(final String key) {
+        return Bytes.wrap(key.getBytes());
+    }
+
     @Test
     public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
         int added = addItemsToCache();
@@ -138,7 +143,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @Test
     public void shouldForwardDirtyItemsWhenFlushCalled() throws Exception {
-        store.put("1", "a");
+        store.put(bytesKey("1"), bytesValue("a"));
         store.flush();
         assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
         assertNull(cacheFlushListener.forwarded.get("1").oldValue);
@@ -146,9 +151,9 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @Test
     public void shouldForwardOldValuesWhenEnabled() throws Exception {
-        store.put("1", "a");
+        store.put(bytesKey("1"), bytesValue("a"));
         store.flush();
-        store.put("1", "b");
+        store.put(bytesKey("1"), bytesValue("b"));
         store.flush();
         assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
         assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue);
@@ -157,8 +162,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldIterateAllStoredItems() throws Exception {
         int items = addItemsToCache();
-        final KeyValueIterator<String, String> all = store.all();
-        final List<String> results = new ArrayList<>();
+        final KeyValueIterator<Bytes, byte[]> all = store.all();
+        final List<Bytes> results = new ArrayList<>();
         while (all.hasNext()) {
             results.add(all.next().key);
         }
@@ -168,8 +173,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test
     public void shouldIterateOverRange() throws Exception {
         int items = addItemsToCache();
-        final KeyValueIterator<String, String> range = store.range(String.valueOf(0), String.valueOf(items));
-        final List<String> results = new ArrayList<>();
+        final KeyValueIterator<Bytes, byte[]> range = store.range(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items)));
+        final List<Bytes> results = new ArrayList<>();
         while (range.hasNext()) {
             results.add(range.next().key);
         }
@@ -178,26 +183,26 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @Test
     public void shouldDeleteItemsFromCache() throws Exception {
-        store.put("a", "a");
-        store.delete("a");
-        assertNull(store.get("a"));
-        assertFalse(store.range("a", "b").hasNext());
+        store.put(bytesKey("a"), bytesValue("a"));
+        store.delete(bytesKey("a"));
+        assertNull(store.get(bytesKey("a")));
+        assertFalse(store.range(bytesKey("a"), bytesKey("b")).hasNext());
         assertFalse(store.all().hasNext());
     }
 
     @Test
     public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() throws Exception {
-        store.put("a", "a");
+        store.put(bytesKey("a"), bytesValue("a"));
         store.flush();
-        store.delete("a");
-        assertNull(store.get("a"));
-        assertFalse(store.range("a", "b").hasNext());
+        store.delete(bytesKey("a"));
+        assertNull(store.get(bytesKey("a")));
+        assertFalse(store.range(bytesKey("a"), bytesKey("b")).hasNext());
         assertFalse(store.all().hasNext());
     }
 
     @Test
     public void shouldClearNamespaceCacheOnClose() throws Exception {
-        store.put("a", "a");
+        store.put(bytesKey("a"), bytesValue("a"));
         assertEquals(1, cache.size());
         store.close();
         assertEquals(0, cache.size());
@@ -206,19 +211,19 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToGetFromClosedCachingStore() throws Exception {
         store.close();
-        store.get("a");
+        store.get(bytesKey("a"));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
         store.close();
-        store.put("a", "a");
+        store.put(bytesKey("a"), bytesValue("a"));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() throws Exception {
         store.close();
-        store.range("a", "b");
+        store.range(bytesKey("a"), bytesKey("b"));
     }
 
     @Test(expected = InvalidStateStoreException.class)
@@ -236,29 +241,29 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToDoPutAllClosedCachingStore() throws Exception {
         store.close();
-        store.putAll(Collections.singletonList(KeyValue.pair("a", "a")));
+        store.putAll(Collections.singletonList(KeyValue.pair(bytesKey("a"), bytesValue("a"))));
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() throws Exception {
         store.close();
-        store.putIfAbsent("b", "c");
+        store.putIfAbsent(bytesKey("b"), bytesValue("c"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnPutWithNullKey() {
-        store.put(null, "c");
+        store.put(null, bytesValue("c"));
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() {
-        store.putIfAbsent(null, "c");
+        store.putIfAbsent(null, bytesValue("c"));
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() {
-        List<KeyValue<String, String>> entries = new ArrayList<>();
-        entries.add(new KeyValue<String, String>(null, "a"));
+        List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<Bytes, byte[]>(null, bytesValue("a")));
         try {
             store.putAll(entries);
             fail("Should have thrown NullPointerException while putAll null key");
@@ -267,21 +272,21 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @Test
     public void shouldPutIfAbsent() {
-        store.putIfAbsent("b", "2");
-        assertTrue(store.get("b").equals("2"));
+        store.putIfAbsent(bytesKey("b"), bytesValue("2"));
+        assertThat(store.get(bytesKey("b")), equalTo(bytesValue("2")));
 
-        store.putIfAbsent("b", "3");
-        assertTrue(store.get("b").equals("2"));
+        store.putIfAbsent(bytesKey("b"), bytesValue("3"));
+        assertThat(store.get(bytesKey("b")), equalTo(bytesValue("2")));
     }
 
     @Test
     public void shouldPutAll() {
-        List<KeyValue<String, String>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>("a", "1"));
-        entries.add(new KeyValue<>("b", "2"));
+        List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(bytesKey("a"), bytesValue("1")));
+        entries.add(new KeyValue<>(bytesKey("b"), bytesValue("2")));
         store.putAll(entries);
-        assertEquals(store.get("a"), "1");
-        assertEquals(store.get("b"), "2");
+        assertThat(store.get(bytesKey("a")), equalTo(bytesValue("1")));
+        assertThat(store.get(bytesKey("b")), equalTo(bytesValue("2")));
     }
 
     @Test
@@ -292,7 +297,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception {
         store.close();
-        store.delete("key");
+        store.delete(bytesKey("key"));
     }
 
     private int addItemsToCache() throws IOException {
@@ -300,7 +305,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         int i = 0;
         while (cachedSize < maxCacheSizeBytes) {
             final String kv = String.valueOf(i++);
-            store.put(kv, kv);
+            store.put(bytesKey(kv), bytesValue(kv));
             cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), topic);
         }
         return i;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
new file mode 100644
index 0000000..3e579bc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+
+public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
+
+    private final String namespace = "0.0-one";
+    private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray());
+    private KeyValueStore<Bytes, byte[]> store;
+    private ThreadCache cache;
+
+    @Before
+    public void setUp() throws Exception {
+        store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
+        cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
+    }
+
+    @Test
+    public void shouldIterateOverRange() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+        for (int i = 0; i < bytes.length; i += 2) {
+            store.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
+        }
+
+        final Bytes from = Bytes.wrap(new byte[]{2});
+        final Bytes to = Bytes.wrap(new byte[]{9});
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
+
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator = new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
+        byte[][] values = new byte[8][];
+        int index = 0;
+        int bytesIndex = 2;
+        while (iterator.hasNext()) {
+            final byte[] value = iterator.next().value;
+            values[index++] = value;
+            assertArrayEquals(bytes[bytesIndex++], value);
+        }
+        iterator.close();
+    }
+
+
+    @Test
+    public void shouldSkipLargerDeletedCacheValue() throws Exception {
+        final byte[][] bytes = {{0}, {1}};
+        store.put(Bytes.wrap(bytes[0]), bytes[0]);
+        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator();
+        assertArrayEquals(bytes[0], iterator.next().key.get());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldSkipSmallerDeletedCachedValue() throws Exception {
+        final byte[][] bytes = {{0}, {1}};
+        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
+        store.put(Bytes.wrap(bytes[1]), bytes[1]);
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator();
+        assertArrayEquals(bytes[1], iterator.next().key.get());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
+        final byte[][] bytes = {{0}};
+        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
+        store.put(Bytes.wrap(bytes[0]), bytes[0]);
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator();
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}};
+        for (byte[] aByte : bytes) {
+            Bytes aBytes = Bytes.wrap(aByte);
+            store.put(aBytes, aByte);
+            cache.put(namespace, aBytes, new LRUCacheEntry(null));
+        }
+        assertFalse(createIterator().hasNext());
+    }
+
+    @Test
+    public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}};
+        for (byte[] aByte : bytes) {
+            cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(null));
+        }
+        assertFalse(createIterator().hasNext());
+    }
+
+    @Test
+    public void shouldSkipAllDeletedFromCache() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+        for (byte[] aByte : bytes) {
+            Bytes aBytes = Bytes.wrap(aByte);
+            store.put(aBytes, aByte);
+            cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
+        }
+        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
+        cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
+
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator = createIterator();
+        assertArrayEquals(bytes[0], iterator.next().key.get());
+        assertArrayEquals(bytes[4], iterator.next().key.get());
+        assertArrayEquals(bytes[5], iterator.next().key.get());
+        assertArrayEquals(bytes[6], iterator.next().key.get());
+        assertArrayEquals(bytes[7], iterator.next().key.get());
+        assertArrayEquals(bytes[9], iterator.next().key.get());
+        assertArrayEquals(bytes[10], iterator.next().key.get());
+        assertFalse(iterator.hasNext());
+
+    }
+
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
+        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
+        byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
+        for (int i = 0; i < bytes.length - 1; i += 2) {
+            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
+        }
+
+        final Bytes from = Bytes.wrap(new byte[]{2});
+        final Bytes to = Bytes.wrap(new byte[]{9});
+        final KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
+
+        final MergedSortedCacheKeyValueBytesStoreIterator iterator =
+                new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
+                                                                storeIterator
+                );
+        final byte[][] values = new byte[8][];
+        int index = 0;
+        int bytesIndex = 2;
+        while (iterator.hasNext()) {
+            final byte[] keys = iterator.peekNextKey().get();
+            values[index++] = keys;
+            assertArrayEquals(bytes[bytesIndex++], keys);
+            iterator.next();
+        }
+        iterator.close();
+    }
+
+    private MergedSortedCacheKeyValueBytesStoreIterator createIterator() {
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all());
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e19c37e5/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
deleted file mode 100644
index 5181c8d..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-
-public class MergedSortedCacheKeyValueStoreIteratorTest {
-
-    private final String namespace = "0.0-one";
-    private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray());
-    private KeyValueStore<Bytes, byte[]> store;
-    private ThreadCache cache;
-
-    @Before
-    public void setUp() throws Exception {
-        store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
-        cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
-    }
-
-    @Test
-    public void shouldIterateOverRange() throws Exception {
-        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
-        for (int i = 0; i < bytes.length; i += 2) {
-            store.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
-        }
-
-        final Bytes from = Bytes.wrap(new byte[]{2});
-        final Bytes to = Bytes.wrap(new byte[]{9});
-        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
-
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
-        byte[][] values = new byte[8][];
-        int index = 0;
-        int bytesIndex = 2;
-        while (iterator.hasNext()) {
-            final byte[] value = iterator.next().value;
-            values[index++] = value;
-            assertArrayEquals(bytes[bytesIndex++], value);
-        }
-        iterator.close();
-    }
-
-
-    @Test
-    public void shouldSkipLargerDeletedCacheValue() throws Exception {
-        final byte[][] bytes = {{0}, {1}};
-        store.put(Bytes.wrap(bytes[0]), bytes[0]);
-        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
-        assertArrayEquals(bytes[0], iterator.next().key);
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void shouldSkipSmallerDeletedCachedValue() throws Exception {
-        final byte[][] bytes = {{0}, {1}};
-        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
-        store.put(Bytes.wrap(bytes[1]), bytes[1]);
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
-        assertArrayEquals(bytes[1], iterator.next().key);
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
-        final byte[][] bytes = {{0}};
-        cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
-        store.put(Bytes.wrap(bytes[0]), bytes[0]);
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
-        final byte[][] bytes = {{0}, {1}, {2}};
-        for (byte[] aByte : bytes) {
-            Bytes aBytes = Bytes.wrap(aByte);
-            store.put(aBytes, aByte);
-            cache.put(namespace, aBytes, new LRUCacheEntry(null));
-        }
-        assertFalse(createIterator().hasNext());
-    }
-
-    @Test
-    public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
-        final byte[][] bytes = {{0}, {1}, {2}};
-        for (byte[] aByte : bytes) {
-            cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(null));
-        }
-        assertFalse(createIterator().hasNext());
-    }
-
-    @Test
-    public void shouldSkipAllDeletedFromCache() throws Exception {
-        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
-        for (byte[] aByte : bytes) {
-            Bytes aBytes = Bytes.wrap(aByte);
-            store.put(aBytes, aByte);
-            cache.put(namespace, aBytes, new LRUCacheEntry(aByte));
-        }
-        cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null));
-        cache.put(namespace, Bytes.wrap(bytes[2]), new LRUCacheEntry(null));
-        cache.put(namespace, Bytes.wrap(bytes[3]), new LRUCacheEntry(null));
-        cache.put(namespace, Bytes.wrap(bytes[8]), new LRUCacheEntry(null));
-        cache.put(namespace, Bytes.wrap(bytes[11]), new LRUCacheEntry(null));
-
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
-        assertArrayEquals(bytes[0], iterator.next().key);
-        assertArrayEquals(bytes[4], iterator.next().key);
-        assertArrayEquals(bytes[5], iterator.next().key);
-        assertArrayEquals(bytes[6], iterator.next().key);
-        assertArrayEquals(bytes[7], iterator.next().key);
-        assertArrayEquals(bytes[9], iterator.next().key);
-        assertArrayEquals(bytes[10], iterator.next().key);
-        assertFalse(iterator.hasNext());
-
-    }
-
-    @Test
-    public void shouldPeekNextKey() throws Exception {
-        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
-        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
-        byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        for (int i = 0; i < bytes.length - 1; i += 2) {
-            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
-            cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));
-        }
-
-        final Bytes from = Bytes.wrap(new byte[]{2});
-        final Bytes to = Bytes.wrap(new byte[]{9});
-        final KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to);
-
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator =
-                new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator,
-                                                             storeIterator,
-                                                             serdes);
-        final byte[][] values = new byte[8][];
-        int index = 0;
-        int bytesIndex = 2;
-        while (iterator.hasNext()) {
-            final byte[] keys = iterator.peekNextKey();
-            values[index++] = keys;
-            assertArrayEquals(bytes[bytesIndex++], keys);
-            iterator.next();
-        }
-        iterator.close();
-    }
-
-    private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace);
-        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all());
-        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
-    }
-}
\ No newline at end of file


Mime
View raw message