kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/5] kafka git commit: KAFKA-3452: Support session windows
Date Fri, 06 Jan 2017 18:12:35 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
new file mode 100644
index 0000000..c107c3e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Merger;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("unchecked")
+public class KStreamSessionWindowAggregateProcessorTest {
+
+    private static final long GAP_MS = 5 * 60 * 1000L;
+    private static final String STORE_NAME = "session-store";
+
+    private final Initializer<Long> initializer = new Initializer<Long>() {
+        @Override
+        public Long apply() {
+            return 0L;
+        }
+    };
+    private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>() {
+        @Override
+        public Long apply(final String aggKey, final String value, final Long aggregate) {
+            return aggregate + 1;
+        }
+    };
+    private final Merger<String, Long> sessionMerger = new Merger<String, Long>() {
+        @Override
+        public Long apply(final String aggKey, final Long aggOne, final Long aggTwo) {
+            return aggOne + aggTwo;
+        }
+    };
+    private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator =
+            new KStreamSessionWindowAggregate<>(SessionWindows.with(GAP_MS).until(3 * GAP_MS),
+                                                STORE_NAME,
+                                                initializer,
+                                                aggregator,
+                                                sessionMerger);
+
+    private final List<KeyValue> results = new ArrayList<>();
+    private Processor<String, String> processor = sessionAggregator.get();
+    private SessionStore<String, Long> sessionStore;
+    private MockProcessorContext context;
+
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void initializeStore() {
+        final File stateDir = TestUtils.tempDirectory();
+        context = new MockProcessorContext(new KStreamTestDriver(new KStreamBuilder(), stateDir), stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(100000)) {
+            @Override
+            public <K, V> void forward(final K key, final V value) {
+                results.add(KeyValue.pair(key, value));
+            }
+        };
+
+        initStore(true);
+        processor.init(context);
+    }
+
+    private void initStore(final boolean enableCaching) {
+        final RocksDBSessionStoreSupplier<String, Long> supplier =
+                new RocksDBSessionStoreSupplier<>(STORE_NAME,
+                                                  GAP_MS * 3,
+                                                  Serdes.String(),
+                                                  Serdes.Long(),
+                                                  false,
+                                                  Collections.<String, String>emptyMap(),
+                                                  enableCaching);
+        sessionStore = (SessionStore<String, Long>) supplier.get();
+        sessionStore.init(context, sessionStore);
+    }
+
+    @After
+    public void closeStore() {
+        sessionStore.close();
+    }
+
+    @Test
+    public void shouldCreateSingleSessionWhenWithinGap() throws Exception {
+        context.setTime(0);
+        processor.process("john", "first");
+        context.setTime(500);
+        processor.process("john", "second");
+
+        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge("john", 0, 2000);
+        assertTrue(values.hasNext());
+        assertEquals(Long.valueOf(2), values.next().value);
+    }
+
+
+    @Test
+    public void shouldMergeSessions() throws Exception {
+        context.setTime(0);
+        final String sessionId = "mel";
+        processor.process(sessionId, "first");
+        assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
+
+        // move time beyond gap
+        context.setTime(GAP_MS + 1);
+        processor.process(sessionId, "second");
+        assertTrue(sessionStore.findSessionsToMerge(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
+        // should still exist as not within gap
+        assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
+        // move time back
+        context.setTime(GAP_MS / 2);
+        processor.process(sessionId, "third");
+
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge(sessionId, 0, GAP_MS + 1);
+        final KeyValue<Windowed<String>, Long> kv = iterator.next();
+
+        assertEquals(Long.valueOf(3), kv.value);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldUpdateSessionIfTheSameTime() throws Exception {
+        context.setTime(0);
+        processor.process("mel", "first");
+        processor.process("mel", "second");
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge("mel", 0, 0);
+        assertEquals(Long.valueOf(2L), iterator.next().value);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() throws Exception {
+        final String sessionId = "mel";
+        long time = 0;
+        context.setTime(time);
+        processor.process(sessionId, "first");
+        context.setTime(time += GAP_MS + 1);
+        processor.process(sessionId, "second");
+        processor.process(sessionId, "second");
+        context.setTime(time += GAP_MS + 1);
+        processor.process(sessionId, "third");
+        processor.process(sessionId, "third");
+        processor.process(sessionId, "third");
+
+        sessionStore.flush();
+        assertEquals(Arrays.asList(
+                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(0, 0)), new Change<>(1L, null)),
+                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)),
+                KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(time, time)), new Change<>(3L, null))
+
+        ), results);
+
+    }
+
+
+    @Test
+    public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
+        context.setTime(0);
+        processor.process("a", "1");
+
+        // first ensure it is in the store
+        final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessionsToMerge("a", 0, 0);
+        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next());
+
+        context.setTime(100);
+        processor.process("a", "2");
+        // a1 from above should have been removed
+        // should have merged session in store
+        final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessionsToMerge("a", 0, 100);
+        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next());
+        assertFalse(a2.hasNext());
+    }
+
+    @Test
+    public void shouldHandleMultipleSessionsAndMerging() throws Exception {
+        context.setTime(0);
+        processor.process("a", "1");
+        processor.process("b", "1");
+        processor.process("c", "1");
+        processor.process("d", "1");
+        context.setTime(GAP_MS / 2);
+        processor.process("d", "2");
+        context.setTime(GAP_MS + 1);
+        processor.process("a", "2");
+        processor.process("b", "2");
+        context.setTime(GAP_MS + 1 + GAP_MS / 2);
+        processor.process("a", "3");
+        processor.process("c", "3");
+
+        sessionStore.flush();
+
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("d", new TimeWindow(0, GAP_MS / 2)), new Change<>(2L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new TimeWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null))
+                     ),
+                     results);
+    }
+
+
+    @Test
+    public void shouldGetAggregatedValuesFromValueGetter() throws Exception {
+        final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
+        getter.init(context);
+        context.setTime(0);
+        processor.process("a", "1");
+        context.setTime(GAP_MS + 1);
+        processor.process("a", "1");
+        processor.process("a", "2");
+        final long t0 = getter.get(new Windowed<>("a", new TimeWindow(0, 0)));
+        final long t1 = getter.get(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1)));
+        assertEquals(1L, t0);
+        assertEquals(2L, t1);
+    }
+
+    @Test
+    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() throws Exception {
+        initStore(false);
+        processor.init(context);
+
+        context.setTime(0);
+        processor.process("a", "1");
+        processor.process("b", "1");
+        processor.process("c", "1");
+
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null))), results);
+    }
+
+    @Test
+    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() throws Exception {
+        initStore(false);
+        processor.init(context);
+
+        context.setTime(0);
+        processor.process("a", "1");
+        context.setTime(5);
+        processor.process("a", "1");
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)),
+                                   KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(null, null)),
+                                   KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 5)), new Change<>(2L, null))), results);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
new file mode 100644
index 0000000..2f5972c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SessionKeySerdeTest {
+
+    @Test
+    public void shouldSerializeDeserialize() throws Exception {
+        final Windowed<Long> key = new Windowed<>(1L, new TimeWindow(10, 100));
+        final SessionKeySerde<Long> serde = new SessionKeySerde<>(Serdes.Long());
+        final byte[] bytes = serde.serializer().serialize("t", key);
+        final Windowed<Long> result = serde.deserializer().deserialize("t", bytes);
+        assertEquals(key, result);
+    }
+
+    @Test
+    public void shouldSerializeNullToNull() throws Exception {
+        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
+        assertNull(serde.serializer().serialize("t", null));
+    }
+
+    @Test
+    public void shouldDeSerializeEmtpyByteArrayToNull() throws Exception {
+        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
+        assertNull(serde.deserializer().deserialize("t", new byte[0]));
+    }
+
+    @Test
+    public void shouldDeSerializeNullToNull() throws Exception {
+        final SessionKeySerde<String> serde = new SessionKeySerde<>(Serdes.String());
+        assertNull(serde.deserializer().deserialize("t", null));
+    }
+
+    @Test
+    public void shouldConvertToBinaryAndBack() throws Exception {
+        final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 20));
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer());
+        assertEquals(key, result);
+    }
+
+    @Test
+    public void shouldExtractEndTimeFromBinary() throws Exception {
+        final Windowed<String> key = new Windowed<>("key", new TimeWindow(10, 100));
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        assertEquals(100, SessionKeySerde.extractEnd(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractStartTimeFromBinary() throws Exception {
+        final Windowed<String> key = new Windowed<>("key", new TimeWindow(50, 100));
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        assertEquals(50, SessionKeySerde.extractStart(serialized.get()));
+    }
+
+    @Test
+    public void shouldExtractKeyBytesFromBinary() throws Exception {
+        final Windowed<String> key = new Windowed<>("blah", new TimeWindow(50, 100));
+        final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer());
+        assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
new file mode 100644
index 0000000..cb6f87e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class CachingSessionStoreTest {
+
+    private static final int MAX_CACHE_SIZE_BYTES = 600;
+    private final StateSerdes<String, Long> serdes =
+            new StateSerdes<>("name", Serdes.String(), Serdes.Long());
+    private RocksDBSegmentedBytesStore underlying;
+    private CachingSessionStore<String, Long> cachingStore;
+    private ThreadCache cache;
+    private static final Long DEFAULT_TIMESTAMP = 10L;
+
+    @Before
+    public void setUp() throws Exception {
+        underlying = new RocksDBSegmentedBytesStore("test", 60000, 3, new SessionKeySchema());
+        cachingStore = new CachingSessionStore<>(underlying,
+                                                 Serdes.String(),
+                                                 Serdes.Long());
+        cache = new ThreadCache(MAX_CACHE_SIZE_BYTES);
+        final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
+        cachingStore.init(context, cachingStore);
+    }
+
+    @Test
+    public void shouldPutFetchFromCache() throws Exception {
+        cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L);
+        cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L);
+
+        final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessionsToMerge("a", 0, 0);
+        final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessionsToMerge("b", 0, 0);
+
+        assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next());
+        assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next());
+        assertFalse(a.hasNext());
+        assertFalse(b.hasNext());
+        assertEquals(3, cache.size());
+    }
+
+    @Test
+    public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
+
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L));
+        for (KeyValue<Windowed<String>, Long> kv : expected) {
+            cachingStore.put(kv.key, kv.value);
+        }
+
+        // add one that shouldn't appear in the results
+        cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L);
+
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(cachingStore.fetch("a"));
+        assertEquals(expected, results);
+
+    }
+
+    @Test
+    public void shouldFlushItemsToStoreOnEviction() throws Exception {
+        final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a", "b", "c", "d");
+        assertEquals(added.size() - 1, cache.size());
+        final KeyValueIterator<Bytes, byte[]> iterator = underlying.fetch(Bytes.wrap(added.get(0).key.key().getBytes()), 0, 0);
+        final KeyValue<Bytes, byte[]> next = iterator.next();
+        assertEquals(added.get(0).key, SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()));
+        assertArrayEquals(serdes.rawValue(added.get(0).value), next.value);
+    }
+
+    @Test
+    public void shouldQueryItemsInCacheAndStore() throws Exception {
+        final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a");
+        final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessionsToMerge("a", 0, added.size() * 10);
+        final List<KeyValue<Windowed<String>, Long>> actual = toList(iterator);
+        assertEquals(added, actual);
+    }
+
+    @Test
+    public void shouldRemove() throws Exception {
+        final Windowed<String> a = new Windowed<>("a", new TimeWindow(0, 0));
+        final Windowed<String> b = new Windowed<>("b", new TimeWindow(0, 0));
+        cachingStore.put(a, 2L);
+        cachingStore.put(b, 2L);
+        cachingStore.flush();
+        cachingStore.remove(a);
+        cachingStore.flush();
+        final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessionsToMerge("a", 0, 0);
+        assertFalse(rangeIter.hasNext());
+    }
+
+    @Test
+    public void shouldFetchCorrectlyAcrossSegments() throws Exception {
+        final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0));
+        final Windowed<String> a2 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
+        final Windowed<String> a3 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+        cachingStore.put(a1, 1L);
+        cachingStore.put(a2, 2L);
+        cachingStore.put(a3, 3L);
+        cachingStore.flush();
+        final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessionsToMerge("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
+        assertEquals(a1, results.next().key);
+        assertEquals(a2, results.next().key);
+        assertEquals(a3, results.next().key);
+        assertFalse(results.hasNext());
+    }
+
+    @Test
+    public void shouldClearNamespaceCacheOnClose() throws Exception {
+        final Windowed<String> a1 = new Windowed<>("a", new TimeWindow(0, 0));
+        cachingStore.put(a1, 1L);
+        assertEquals(1, cache.size());
+        cachingStore.close();
+        assertEquals(0, cache.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.fetch("a");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.findSessionsToMerge("a", 0, Long.MAX_VALUE);
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.remove(new Windowed<>("a", new TimeWindow(0, 0)));
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L);
+    }
+
+    private List<KeyValue<Windowed<String>, Long>> addSessionsUntilOverflow(final String...sessionIds) {
+        final Random random = new Random();
+        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        while (cache.size() == results.size()) {
+            final String sessionId = sessionIds[random.nextInt(sessionIds.length)];
+            addSingleSession(sessionId, results);
+        }
+        return results;
+    }
+
+    private void addSingleSession(final String sessionId, final List<KeyValue<Windowed<String>, Long>> allSessions) {
+        final int timestamp = allSessions.size() * 10;
+        final Windowed<String> key = new Windowed<>(sessionId, new TimeWindow(timestamp, timestamp));
+        final Long value = 1L;
+        cachingStore.put(key, value);
+        allSessions.add(KeyValue.pair(key, value));
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 023fea6..427798d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.TestUtils;
@@ -44,16 +45,18 @@ public class CachingWindowStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 150;
     private static final Long WINDOW_SIZE = 10000L;
-    private RocksDBWindowStore<Bytes, byte[]> underlying;
+    private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
     private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>> cacheListener;
     private ThreadCache cache;
     private String topic;
-    private static final Long DEFAULT_TIMESTAMP = 10L;
+    private static final long DEFAULT_TIMESTAMP = 10L;
+    private WindowStoreKeySchema keySchema;
 
     @Before
     public void setUp() throws Exception {
-        underlying = new RocksDBWindowStore<>("test", 30000, 3, false, Serdes.Bytes(), Serdes.ByteArray());
+        keySchema = new WindowStoreKeySchema();
+        underlying = new RocksDBSegmentedBytesStore("test", 30000, 3, keySchema);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(underlying,
                                                 Serdes.String(),
@@ -85,9 +88,9 @@ public class CachingWindowStoreTest {
     public void shouldFlushEvictedItemsIntoUnderlyingStore() throws Exception {
         int added = addItemsToCache();
         // all dirty entries should have been flushed
-        final WindowStoreIterator<byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
-        final KeyValue<Long, byte[]> next = iter.next();
-        assertEquals(DEFAULT_TIMESTAMP, next.key);
+        final KeyValueIterator<Bytes, byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
+        final KeyValue<Bytes, byte[]> next = iter.next();
+        assertEquals(DEFAULT_TIMESTAMP, keySchema.segmentTimestamp(next.key));
         assertArrayEquals("0".getBytes(), next.value);
         assertFalse(iter.hasNext());
         assertEquals(added - 1, cache.size());
@@ -143,7 +146,8 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldIterateCacheAndStore() throws Exception {
-        underlying.put(Bytes.wrap("1".getBytes()), "a".getBytes());
+        final Bytes key = Bytes.wrap("1" .getBytes());
+        underlying.put(Bytes.wrap(WindowStoreUtils.toBinaryKey(key, DEFAULT_TIMESTAMP, 0, WindowStoreUtils.INNER_SERDES)), "a".getBytes());
         cachingStore.put("1", "b", DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<String> fetch = cachingStore.fetch("1", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
         assertEquals(KeyValue.pair(DEFAULT_TIMESTAMP, "a"), fetch.next());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
new file mode 100644
index 0000000..d4f9e47
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.SegmentedBytesStoreStub;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ChangeLoggingSegmentedBytesStoreTest {
+
+    private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub();
+    private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore(bytesStore);
+    private final Map sent = new HashMap<>();
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        final NoOpRecordCollector collector = new NoOpRecordCollector() {
+            @Override
+            public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+                sent.put(record.key(), record.value());
+            }
+        };
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      TestUtils.tempDirectory(),
+                                                                      Serdes.String(),
+                                                                      Serdes.Long(),
+                                                                      collector,
+                                                                      new ThreadCache(0));
+        context.setTime(0);
+        store.init(context, store);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldLogPuts() throws Exception {
+        final byte[] value1 = {0};
+        final byte[] value2 = {1};
+        final Bytes key1 = Bytes.wrap(value1);
+        final Bytes key2 = Bytes.wrap(value2);
+        store.put(key1, value1);
+        store.put(key2, value2);
+        store.flush();
+        assertArrayEquals(value1, (byte[]) sent.get(key1));
+        assertArrayEquals(value2, (byte[]) sent.get(key2));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldLogRemoves() throws Exception {
+        final Bytes key1 = Bytes.wrap(new byte[]{0});
+        final Bytes key2 = Bytes.wrap(new byte[]{1});
+        store.remove(key1);
+        store.remove(key2);
+        store.flush();
+        assertTrue(sent.containsKey(key1));
+        assertTrue(sent.containsKey(key2));
+        assertNull(sent.get(key1));
+        assertNull(sent.get(key2));
+    }
+
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception {
+        store.fetch(Bytes.wrap(new byte[0]), 1, 1);
+        assertTrue(bytesStore.fetchCalled);
+    }
+
+    @Test
+    public void shouldFlushUnderlyingStore() throws Exception {
+        store.flush();
+        assertTrue(bytesStore.flushed);
+    }
+
+    @Test
+    public void shouldCloseUnderlyingStore() throws Exception {
+        store.close();
+        assertTrue(bytesStore.closed);
+    }
+
+    @Test
+    public void shouldInitUnderlyingStore() throws Exception {
+        assertTrue(bytesStore.initialized);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
new file mode 100644
index 0000000..fc4a4c5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.test.ReadOnlySessionStoreStub;
+import org.apache.kafka.test.StateStoreProviderStub;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class CompositeReadOnlySessionStoreTest {
+
+    private final String storeName = "session-store";
+    private final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
+    private final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
+    private final ReadOnlySessionStoreStub<String, Long> underlyingSessionStore = new ReadOnlySessionStoreStub<>();
+    private final ReadOnlySessionStoreStub<String, Long> otherUnderlyingStore = new ReadOnlySessionStoreStub<>();
+    private CompositeReadOnlySessionStore<String, Long> sessionStore;
+
+    @Before
+    public void before() {
+        stubProviderOne.addStore(storeName, underlyingSessionStore);
+        stubProviderOne.addStore("other-session-store", otherUnderlyingStore);
+
+
+        sessionStore = new CompositeReadOnlySessionStore<>(
+                new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
+                QueryableStoreTypes.<String, Long>sessionStore(), storeName);
+    }
+
+    @Test
+    public void shouldFetchResulstFromUnderlyingSessionStore() throws Exception {
+        underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L);
+        underlyingSessionStore.put(new Windowed<>("a", new TimeWindow(10, 10)), 2L);
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a"));
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
+                                   KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L)),
+                     results);
+    }
+
+    @Test
+    public void shouldReturnEmptyIteratorIfNoData() throws Exception {
+        final KeyValueIterator<Windowed<String>, Long> result = sessionStore.fetch("b");
+        assertFalse(result.hasNext());
+    }
+
+    @Test
+    public void shouldFindValueForKeyWhenMultiStores() throws Exception {
+        final ReadOnlySessionStoreStub<String, Long> secondUnderlying = new
+                ReadOnlySessionStoreStub<>();
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+
+        final Windowed<String> keyOne = new Windowed<>("key-one", new TimeWindow(0, 0));
+        final Windowed<String> keyTwo = new Windowed<>("key-two", new TimeWindow(0, 0));
+        underlyingSessionStore.put(keyOne, 0L);
+        secondUnderlying.put(keyTwo, 10L);
+
+        final List<KeyValue<Windowed<String>, Long>> keyOneResults = toList(sessionStore.fetch("key-one"));
+        final List<KeyValue<Windowed<String>, Long>> keyTwoResults = toList(sessionStore.fetch("key-two"));
+
+        assertEquals(Collections.singletonList(KeyValue.pair(keyOne, 0L)), keyOneResults);
+        assertEquals(Collections.singletonList(KeyValue.pair(keyTwo, 10L)), keyTwoResults);
+    }
+
+    @Test
+    public void shouldNotGetValueFromOtherStores() throws Exception {
+        final Windowed<String> expectedKey = new Windowed<>("foo", new TimeWindow(0, 0));
+        otherUnderlyingStore.put(new Windowed<>("foo", new TimeWindow(10, 10)), 10L);
+        underlyingSessionStore.put(expectedKey, 1L);
+
+        final KeyValueIterator<Windowed<String>, Long> result = sessionStore.fetch("foo");
+        assertEquals(KeyValue.pair(expectedKey, 1L), result.next());
+        assertFalse(result.hasNext());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStateStoreExceptionOnRebalance() throws Exception {
+        final CompositeReadOnlySessionStore<String, String> store
+                = new CompositeReadOnlySessionStore<>(new StateStoreProviderStub(true),
+                                                      QueryableStoreTypes.<String, String>sessionStore(),
+                                                      "whateva");
+
+        store.fetch("a");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() throws Exception {
+        underlyingSessionStore.setOpen(false);
+        underlyingSessionStore.fetch("key");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index 4a32187..bca4837 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -27,17 +27,18 @@ import static org.junit.Assert.assertTrue;
 
 public class DelegatingPeekingKeyValueIteratorTest {
 
+    private final String name = "name";
     private InMemoryKeyValueStore<String, String> store;
 
     @Before
     public void setUp() throws Exception {
-        store = new InMemoryKeyValueStore<>("name");
+        store = new InMemoryKeyValueStore<>(name);
     }
 
     @Test
     public void shouldPeekNext() throws Exception {
         store.put("A", "A");
-        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         assertEquals("A", peekingIterator.peekNextKey());
         assertEquals("A", peekingIterator.peekNextKey());
         assertTrue(peekingIterator.hasNext());
@@ -50,7 +51,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
             store.put(kv, kv);
         }
 
-        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         int index = 0;
         while (peekingIterator.hasNext()) {
             final String peekNext = peekingIterator.peekNextKey();
@@ -64,13 +65,13 @@ public class DelegatingPeekingKeyValueIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception {
-        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.next();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception {
-        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.peekNextKey();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
deleted file mode 100644
index 3f251b3..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingWindowIteratorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.MockProcessorContext;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class DelegatingPeekingWindowIteratorTest {
-
-    private static final long DEFAULT_TIMESTAMP = 0L;
-    private WindowStore<String, String> store;
-
-    @Before
-    public void setUp() throws Exception {
-        store = new RocksDBWindowStore<>("test", 30000, 3, false, Serdes.String(), Serdes.String());
-        final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, null);
-        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
-        store.init(context, store);
-    }
-
-    @Test
-    public void shouldPeekNext() throws Exception {
-        final KeyValue<Long, String> expected = KeyValue.pair(DEFAULT_TIMESTAMP, "A");
-        store.put("A", "A");
-        final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("A", DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP));
-        assertEquals(expected, peekingIterator.peekNext());
-        assertEquals(expected, peekingIterator.peekNext());
-        assertTrue(peekingIterator.hasNext());
-    }
-
-    @Test
-    public void shouldPeekAndIterate() throws Exception {
-        final List<KeyValue<Long, String>> expected = new ArrayList<>();
-        for (long t = 0; t < 50; t += 10) {
-            store.put("a", String.valueOf(t), t);
-            expected.add(KeyValue.pair(t, String.valueOf(t)));
-        }
-        final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("a", 0, 50));
-        int index = 0;
-        while (peekingIterator.hasNext()) {
-            final KeyValue<Long, String> peekNext = peekingIterator.peekNext();
-            final KeyValue<Long, String> key = peekingIterator.next();
-            assertEquals(expected.get(index), peekNext);
-            assertEquals(expected.get(index), key);
-            index++;
-        }
-        assertEquals(expected.size(), index);
-    }
-
-    @Test(expected = NoSuchElementException.class)
-    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception {
-        final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10));
-        peekingIterator.next();
-    }
-
-    @Test(expected = NoSuchElementException.class)
-    public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception {
-        final DelegatingPeekingWindowIterator<String> peekingIterator = new DelegatingPeekingWindowIterator<>(store.fetch("b", 10, 10));
-        peekingIterator.peekNext();
-    }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index c7a1a2c..6bb27b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -103,12 +103,12 @@ class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator());
+        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator()));
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new TheIterator(map.entrySet().iterator());
+        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(map.entrySet().iterator()));
     }
 
     private class TheIterator implements KeyValueIterator<K, V> {
@@ -125,6 +125,11 @@ class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
 
         @Override
+        public K peekNextKey() {
+            throw new UnsupportedOperationException("peekNextKey not supported");
+        }
+
+        @Override
         public boolean hasNext() {
             return underlying.hasNext();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index dee2593..db391ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.junit.Before;
@@ -50,7 +51,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
 
         final Bytes from = Bytes.wrap(new byte[]{2});
         final Bytes to = Bytes.wrap(new byte[]{9});
-        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.range(from, to));
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store",  store.range(from, to));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
 
         final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
@@ -138,9 +139,40 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
 
     }
 
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one");
+        final ThreadCache cache = new ThreadCache(1000000L);
+        byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
+        final String namespace = "one";
+        for (int i = 0; i < bytes.length - 1; i += 2) {
+            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1]));
+        }
+
+        final Bytes from = Bytes.wrap(new byte[]{2});
+        final Bytes to = Bytes.wrap(new byte[]{9});
+        final KeyValueIterator<Bytes, byte[]> storeIterator = kv.range(from, to);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
+
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator =
+                new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator,
+                                                             storeIterator,
+                                                             serdes);
+        final byte[][] values = new byte[8][];
+        int index = 0;
+        int bytesIndex = 2;
+        while (iterator.hasNext()) {
+            final byte[] keys = iterator.peekNextKey();
+            values[index++] = keys;
+            assertArrayEquals(bytes[bytesIndex++], keys);
+            iterator.next();
+        }
+    }
+
     private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace);
-        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all());
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
index 9ee8b29..c33f174 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
@@ -18,13 +18,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -34,16 +35,17 @@ public class MergedSortedCacheWindowStoreIteratorTest {
 
     @Test
     public void shouldIterateOverValueFromBothIterators() throws Exception {
-        final List<KeyValue<Long, byte[]>> storeValues = new ArrayList<>();
+        final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>();
         final ThreadCache cache = new ThreadCache(1000000L);
         final String namespace = "one";
         final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String());
         final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();
 
         for (long t = 0; t < 100; t += 20) {
-            final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, String.valueOf(t).getBytes());
+            final byte[] v1Bytes = String.valueOf(t).getBytes();
+            final KeyValue<Bytes, byte[]> v1 = KeyValue.pair(Bytes.wrap(WindowStoreUtils.toBinaryKey("a", t, 0, stateSerdes)), v1Bytes);
             storeValues.add(v1);
-            expectedKvPairs.add(v1);
+            expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
             final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
             final byte[] valBytes = String.valueOf(t + 10).getBytes();
             expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
@@ -52,11 +54,11 @@ public class MergedSortedCacheWindowStoreIteratorTest {
 
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
-        final PeekingWindowIterator<byte[]> storeIterator = new DelegatingPeekingWindowIterator<>(new WindowStoreIteratorStub(storeValues.iterator()));
+        final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name", new KeyValueIteratorStub<>(storeValues.iterator()));
 
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, binaryFrom, binaryTo);
 
-        final MergedSortedCachedWindowStoreIterator<byte[], byte[]> iterator = new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray()));
+        final MergedSortedCachedWindowStoreIterator<Bytes, byte[]> iterator = new MergedSortedCachedWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Bytes(), Serdes.ByteArray()));
         int index = 0;
         while (iterator.hasNext()) {
             final KeyValue<Long, byte[]> next = iterator.next();
@@ -66,32 +68,4 @@ public class MergedSortedCacheWindowStoreIteratorTest {
         }
     }
 
-    private static class WindowStoreIteratorStub implements WindowStoreIterator<byte[]> {
-
-        private final Iterator<KeyValue<Long, byte[]>> iterator;
-
-        public WindowStoreIteratorStub(final Iterator<KeyValue<Long, byte[]>> iterator) {
-            this.iterator = iterator;
-        }
-
-        @Override
-        public void close() {
-            //no-op
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public KeyValue<Long, byte[]> next() {
-            return iterator.next();
-        }
-
-        @Override
-        public void remove() {
-
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
new file mode 100644
index 0000000..1587f13
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.SegmentedBytesStoreStub;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+public class MeteredSegmentedBytesStoreTest {
+    private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub();
+    private final MeteredSegmentedBytesStore store = new MeteredSegmentedBytesStore(bytesStore, "scope", new MockTime());
+    private final Set<String> latencyRecorded = new HashSet<>();
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        final Metrics metrics = new Metrics();
+        final StreamsMetrics streamsMetrics = new StreamsMetrics() {
+
+            @Override
+            public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+                return metrics.sensor(operationName);
+            }
+
+            @Override
+            public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+                latencyRecorded.add(sensor.name());
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      TestUtils.tempDirectory(),
+                                                                      Serdes.String(),
+                                                                      Serdes.Long(),
+                                                                      new NoOpRecordCollector(),
+                                                                      new ThreadCache(0)) {
+            @Override
+            public StreamsMetrics metrics() {
+                return streamsMetrics;
+            }
+        };
+        store.init(context, store);
+    }
+
+    @Test
+    public void shouldRecordRestoreLatencyOnInit() throws Exception {
+        assertTrue(latencyRecorded.contains("restore"));
+        assertTrue(bytesStore.initialized);
+    }
+
+    @Test
+    public void shouldRecordPutLatency() throws Exception {
+        store.put(Bytes.wrap(new byte[0]), new byte[0]);
+        assertTrue(latencyRecorded.contains("put"));
+        assertTrue(bytesStore.putCalled);
+    }
+
+    @Test
+    public void shouldRecordFetchLatency() throws Exception {
+        store.fetch(Bytes.wrap(new byte[0]), 1, 1).close(); // recorded on close;
+        assertTrue(latencyRecorded.contains("fetch"));
+        assertTrue(bytesStore.fetchCalled);
+    }
+
+    @Test
+    public void shouldRecordRemoveLatency() throws Exception {
+        store.remove(null);
+        assertTrue(latencyRecorded.contains("remove"));
+        assertTrue(bytesStore.removeCalled);
+    }
+
+    @Test
+    public void shouldRecordFlushLatency() throws Exception {
+        store.flush();
+        assertTrue(latencyRecorded.contains("flush"));
+        assertTrue(bytesStore.flushed);
+    }
+
+    @Test
+    public void shouldRecordGetLatency() throws Exception {
+        store.get(null);
+        assertTrue(latencyRecorded.contains("get"));
+        assertTrue(bytesStore.getCalled);
+    }
+
+    @Test
+    public void shouldCloseUnderlyingStore() throws Exception {
+        store.close();
+        assertTrue(bytesStore.closed);
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 0a782d5..99deb50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
@@ -200,6 +201,21 @@ public class NamedCacheTest {
     }
 
     @Test
+    public void shouldRemoveDeletedValuesOnFlush() throws Exception {
+        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                // no-op
+            }
+        });
+        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, true, 0, 0, 0, ""));
+        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, true, 0, 0, 0, ""));
+        cache.flush();
+        assertEquals(1, cache.size());
+        assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
+    }
+
+    @Test
     public void shouldBeReentrantAndNotBreakLRU() throws Exception {
         final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
         final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
@@ -259,4 +275,5 @@ public class NamedCacheTest {
         cache.put(key, dirty);
         cache.evict();
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
new file mode 100644
index 0000000..d4c81c3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class RocksDBSegmentedBytesStoreTest {
+
+    private final long retention = 60000L;
+    private final int numSegments = 3;
+    private final String storeName = "bytes-store";
+    private RocksDBSegmentedBytesStore bytesStore;
+    private File stateDir;
+
+    @Before
+    public void before() {
+
+        bytesStore = new RocksDBSegmentedBytesStore(storeName,
+                                                    retention,
+                                                    numSegments,
+                                                    new SessionKeySchema());
+
+        stateDir = TestUtils.tempDirectory();
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      stateDir,
+                                                                      Serdes.String(),
+                                                                      Serdes.Long(),
+                                                                      new NoOpRecordCollector(),
+                                                                      new ThreadCache(0));
+        bytesStore.init(context, bytesStore);
+    }
+
+    @After
+    public void close() {
+        bytesStore.close();
+    }
+
+    @Test
+    public void shouldPutAndFetch() throws Exception {
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(10, 10L))), serializeValue(10L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(500L, 1000L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1500L, 2000L))), serializeValue(100L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(2500L, 3000L))), serializeValue(200L));
+
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(10, 10)), 10L),
+                                                                                    KeyValue.pair(new Windowed<>(key, new TimeWindow(500, 1000)), 50L));
+
+        final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L);
+        assertEquals(expected, toList(values));
+    }
+
+    @Test
+    public void shouldFindValuesWithinRange() throws Exception {
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1000L, 1000L))), serializeValue(10L));
+        final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L);
+        assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 10L)), toList(results));
+    }
+
+    @Test
+    public void shouldRemove() throws Exception {
+        bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000))), serializeValue(30L));
+        bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(1500, 2500))), serializeValue(50L));
+
+        bytesStore.remove(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000))));
+        final KeyValueIterator<Bytes, byte[]> value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L);
+        assertFalse(value.hasNext());
+    }
+
+    @Test
+    public void shouldRollSegments() throws Exception {
+        // just to validate directories
+        final Segments segments = new Segments(storeName, retention, numSegments);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L));
+        assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(30000L, 60000L))), serializeValue(100L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(61000L, 120000L))), serializeValue(200L));
+        assertEquals(Utils.mkSet(segments.segmentName(0),
+                                 segments.segmentName(1),
+                                 segments.segmentName(2)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(121000L, 180000L))), serializeValue(300L));
+        assertEquals(Utils.mkSet(segments.segmentName(1),
+                                 segments.segmentName(2),
+                                 segments.segmentName(3)), segmentDirs());
+
+        bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(181000L, 240000L))), serializeValue(400L));
+        assertEquals(Utils.mkSet(segments.segmentName(2),
+                                 segments.segmentName(3),
+                                 segments.segmentName(4)), segmentDirs());
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000));
+        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(61000L, 120000L)), 200L),
+                                   KeyValue.pair(new Windowed<>(key, new TimeWindow(121000L, 180000L)), 300L),
+                                   KeyValue.pair(new Windowed<>(key, new TimeWindow(181000L, 240000L)), 400L)
+                                                 ), results);
+
+    }
+
+    private Set<String> segmentDirs() {
+        File windowDir = new File(stateDir, storeName);
+
+        return new HashSet<>(Arrays.asList(windowDir.list()));
+    }
+
+    private byte[] serializeValue(final long value) {
+        return Serdes.Long().serializer().serialize("", value);
+    }
+
+    private Bytes serializeKey(final Windowed<String> key) {
+        return SessionKeySerde.toBinary(key, Serdes.String().serializer());
+    }
+
+    private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
+        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            final KeyValue<Bytes, byte[]> next = iterator.next();
+            final KeyValue<Windowed<String>, Long> deserialized
+                    = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer()), Serdes.Long().deserializer().deserialize("", next.value));
+            results.add(deserialized);
+        }
+        return results;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
new file mode 100644
index 0000000..11766c7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RocksDBSessionStoreTest {
+
+    private SessionStore<String, Long> sessionStore;
+
+    @Before
+    public void before() {
+        final RocksDBSegmentedBytesStore bytesStore =
+                new RocksDBSegmentedBytesStore("session-store", 10000L, 3, new SessionKeySchema());
+
+        sessionStore = new RocksDBSessionStore<>(bytesStore,
+                                                 Serdes.String(),
+                                                 Serdes.Long());
+
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      TestUtils.tempDirectory(),
+                                                                      Serdes.String(),
+                                                                      Serdes.Long(),
+                                                                      new NoOpRecordCollector(),
+                                                                      new ThreadCache(0));
+        sessionStore.init(context, sessionStore);
+    }
+
+    @After
+    public void close() {
+        sessionStore.close();
+    }
+
+    @Test
+    public void shouldPutAndFindSessionsInRange() throws Exception {
+        final String key = "a";
+        final Windowed<String> a1 = new Windowed<>(key, new TimeWindow(10, 10L));
+        final Windowed<String> a2 = new Windowed<>(key, new TimeWindow(500L, 1000L));
+        sessionStore.put(a1, 1L);
+        sessionStore.put(a2, 2L);
+        sessionStore.put(new Windowed<>(key, new TimeWindow(1500L, 2000L)), 1L);
+        sessionStore.put(new Windowed<>(key, new TimeWindow(2500L, 3000L)), 2L);
+
+        final List<KeyValue<Windowed<String>, Long>> expected
+                = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
+
+        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge(key, 0, 1000L);
+        assertEquals(expected, toList(values));
+    }
+
+    @Test
+    public void shouldFetchAllSessionsWithSameRecordKey() throws Exception {
+
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L),
+                                                                                    KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L));
+        for (KeyValue<Windowed<String>, Long> kv : expected) {
+            sessionStore.put(kv.key, kv.value);
+        }
+
+        // add one that shouldn't appear in the results
+        sessionStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L);
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a"));
+        assertEquals(expected, results);
+
+    }
+
+
+    @Test
+    public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception {
+        final String key = "a";
+        sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L);
+        sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L);
+        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge(key, -1, 1000L);
+
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+                KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L),
+                KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L));
+        assertEquals(expected, toList(results));
+    }
+
+    @Test
+    public void shouldRemove() throws Exception {
+        sessionStore.put(new Windowed<>("a", new TimeWindow(0, 1000)), 1L);
+        sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L);
+
+        sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000)));
+        assertFalse(sessionStore.findSessionsToMerge("a", 0, 1000L).hasNext());
+
+        assertTrue(sessionStore.findSessionsToMerge("a", 1500, 2500).hasNext());
+    }
+
+    @Test
+    public void shouldFindSessionsToMerge() throws Exception {
+        final Windowed<String> session1 = new Windowed<>("a", new TimeWindow(0, 100));
+        final Windowed<String> session2 = new Windowed<>("a", new TimeWindow(101, 200));
+        final Windowed<String> session3 = new Windowed<>("a", new TimeWindow(201, 300));
+        final Windowed<String> session4 = new Windowed<>("a", new TimeWindow(301, 400));
+        final Windowed<String> session5 = new Windowed<>("a", new TimeWindow(401, 500));
+        sessionStore.put(session1, 1L);
+        sessionStore.put(session2, 2L);
+        sessionStore.put(session3, 3L);
+        sessionStore.put(session4, 4L);
+        sessionStore.put(session5, 5L);
+        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge("a", 150, 300);
+        assertEquals(session2, results.next().key);
+        assertEquals(session3, results.next().key);
+        assertFalse(results.hasNext());
+    }
+
+    static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>, Long> iterator) {
+        final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(iterator.next());
+        }
+        return results;
+    }
+
+
+}
\ No newline at end of file


Mime
View raw message