kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3912: Query local state stores
Date Tue, 19 Jul 2016 21:02:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b418922a3 -> f1dd0d272


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
new file mode 100644
index 0000000..974f109
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class QueryableStateIntegrationTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+    private static final String STREAM_ONE = "stream-one";
+    private static final String STREAM_TWO = "stream-two";
+    private static final String OUTPUT_TOPIC = "output";
+
+    private Properties streamsConfiguration;
+    private KStreamBuilder builder;
+    private KafkaStreams kafkaStreams;
+    private Comparator<KeyValue<String, String>> stringComparator;
+    private Comparator<KeyValue<String, Long>> stringLongComparator;
+
+    @BeforeClass
+    public static void createTopics() {
+        CLUSTER.createTopic(STREAM_ONE);
+        CLUSTER.createTopic(STREAM_TWO);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+    }
+
+    @Before
+    public void before() throws IOException {
+        builder = new KStreamBuilder();
+        streamsConfiguration = new Properties();
+        final String applicationId = "queryable-state";
+
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
+                                 TestUtils.tempDirectory("qs-test")
+                                     .getPath());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration
+            .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        stringComparator = new Comparator<KeyValue<String, String>>() {
+
+            @Override
+            public int compare(final KeyValue<String, String> o1,
+                               final KeyValue<String, String> o2) {
+                return o1.key.compareTo(o2.key);
+            }
+        };
+        stringLongComparator = new Comparator<KeyValue<String, Long>>() {
+
+            @Override
+            public int compare(final KeyValue<String, Long> o1,
+                               final KeyValue<String, Long> o2) {
+                return o1.key.compareTo(o2.key);
+            }
+        };
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldBeAbleToQueryState() throws Exception {
+        final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+
+        final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
+        batch1.addAll(Arrays.asList(
+            new KeyValue<>(keys[0], "hello"),
+            new KeyValue<>(keys[1], "goodbye"),
+            new KeyValue<>(keys[2], "welcome"),
+            new KeyValue<>(keys[3], "go"),
+            new KeyValue<>(keys[4], "kafka")));
+
+
+        final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
+        for (String key : keys) {
+            expectedCount.add(new KeyValue<>(key, 1L));
+        }
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            STREAM_ONE,
+            batch1,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()));
+
+        final KStream<String, String> s1 = builder.stream(STREAM_ONE);
+
+        // Non Windowed
+        s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), OUTPUT_TOPIC);
+
+        s1.groupByKey().count(TimeWindows.of(60000L), "windowed-count");
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        waitUntilAtLeastOneRecordProcessed();
+
+        final ReadOnlyKeyValueStore<String, Long>
+            myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
+
+        final ReadOnlyWindowStore<String, Long> windowStore =
+                kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
+        verifyCanGetByKey(keys,
+                          expectedCount,
+                          expectedCount,
+                          windowStore,
+                          myCount);
+
+        verifyRangeAndAll(expectedCount, myCount);
+
+    }
+
+    private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
+                                   final ReadOnlyKeyValueStore<String, Long> myCount) {
+        final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
+        final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
+        final Set<KeyValue<String, Long>>
+            expectedRangeResults =
+            new TreeSet<>(stringLongComparator);
+
+        expectedRangeResults.addAll(Arrays.asList(
+            new KeyValue<>("hello", 1L),
+            new KeyValue<>("go", 1L),
+            new KeyValue<>("goodbye", 1L),
+            new KeyValue<>("kafka", 1L)
+        ));
+
+        try (final KeyValueIterator<String, Long> range = myCount.range("go", "kafka")) {
+            while (range.hasNext()) {
+                countRangeResults.add(range.next());
+            }
+        }
+
+        try (final KeyValueIterator<String, Long> all = myCount.all()) {
+            while (all.hasNext()) {
+                countAllResults.add(all.next());
+            }
+        }
+
+        assertThat(countRangeResults, equalTo(expectedRangeResults));
+        assertThat(countAllResults, equalTo(expectedCount));
+    }
+
+    private void verifyCanGetByKey(final String[] keys,
+                                   final Set<KeyValue<String, Long>> expectedWindowState,
+                                   final Set<KeyValue<String, Long>> expectedCount,
+                                   final ReadOnlyWindowStore<String, Long> windowStore,
+                                   final ReadOnlyKeyValueStore<String, Long> myCount)
+        throws InterruptedException {
+        final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
+        final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
+
+        final long timeout = System.currentTimeMillis() + 30000;
+        while (windowState.size() < 5 &&
+               countState.size() < 5 &&
+               System.currentTimeMillis() < timeout) {
+            Thread.sleep(10);
+            for (String key : keys) {
+                windowState.addAll(fetch(windowStore, key));
+                final Long value = myCount.get(key);
+                if (value != null) {
+                    countState.add(new KeyValue<>(key, value));
+                }
+            }
+        }
+        assertThat(windowState, equalTo(expectedWindowState));
+        assertThat(countState, equalTo(expectedCount));
+    }
+
+    private void waitUntilAtLeastOneRecordProcessed() throws InterruptedException {
+        final Properties config = new Properties();
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
+        config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                           StringDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                           LongDeserializer.class.getName());
+        IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
+                                                               OUTPUT_TOPIC,
+                                                               1,
+                                                               60 *
+                                                               1000);
+    }
+
+    private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
+                                                final String key) {
+
+        final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+        if (fetch.hasNext()) {
+            KeyValue<Long, Long> next = fetch.next();
+            return Collections.singleton(KeyValue.pair(key, next.value));
+        }
+        return Collections.emptySet();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
new file mode 100644
index 0000000..2e47626
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+
+public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
+
+    @Override
+    public String name() {
+        return "";
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+    @Override
+    public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
new file mode 100644
index 0000000..14af192
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class CompositeReadOnlyKeyValueStoreTest {
+
+    private final String storeName = "my-store";
+    private StateStoreProviderStub stubProviderTwo;
+    private KeyValueStore<String, String> stubOneUnderlying;
+    private CompositeReadOnlyKeyValueStore<String, String> theStore;
+    private KeyValueStore<String, String>
+        otherUnderlyingStore;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void before() {
+        final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
+        stubProviderTwo = new StateStoreProviderStub();
+
+        stubOneUnderlying = newStoreInstance();
+        stubProviderOne.addStore(storeName, stubOneUnderlying);
+        otherUnderlyingStore = newStoreInstance();
+        stubProviderOne.addStore("other-store", otherUnderlyingStore);
+
+        theStore = new CompositeReadOnlyKeyValueStore<>(
+            new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
+                                        QueryableStoreTypes.<String, String>keyValueStore(),
+                                        storeName);
+    }
+
+    private KeyValueStore<String, String> newStoreInstance() {
+        return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class);
+    }
+
+    @Test
+    public void shouldReturnNullIfKeyDoesntExist() throws Exception {
+        assertNull(theStore.get("whatever"));
+    }
+
+    @Test
+    public void shouldReturnValueIfExists() throws Exception {
+        stubOneUnderlying.put("key", "value");
+        assertEquals("value", theStore.get("key"));
+    }
+
+    @Test
+    public void shouldNotGetValuesFromOtherStores() throws Exception {
+        otherUnderlyingStore.put("otherKey", "otherValue");
+        assertNull(theStore.get("otherKey"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFindValueForKeyWhenMultiStores() throws Exception {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        cache.put("key-two", "key-two-value");
+        stubOneUnderlying.put("key-one", "key-one-value");
+
+        assertEquals("key-two-value", theStore.get("key-two"));
+        assertEquals("key-one-value", theStore.get("key-one"));
+    }
+
+    @Test
+    public void shouldSupportRange() throws Exception {
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("c", "c");
+
+        final List<KeyValue<String, String>> results = toList(theStore.range("a", "c"));
+        assertTrue(results.contains(new KeyValue<>("a", "a")));
+        assertTrue(results.contains(new KeyValue<>("b", "b")));
+        assertEquals(2, results.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSupportRangeAcrossMultipleKVStores() throws Exception {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = toList(theStore.range("a", "e"));
+        assertTrue(results.contains(new KeyValue<>("a", "a")));
+        assertTrue(results.contains(new KeyValue<>("b", "b")));
+        assertTrue(results.contains(new KeyValue<>("c", "c")));
+        assertTrue(results.contains(new KeyValue<>("d", "d")));
+        assertEquals(4, results.size());
+    }
+
+    @Test
+    public void shouldSupportAllAcrossMultipleStores() throws Exception {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = toList(theStore.all());
+        assertTrue(results.contains(new KeyValue<>("a", "a")));
+        assertTrue(results.contains(new KeyValue<>("b", "b")));
+        assertTrue(results.contains(new KeyValue<>("c", "c")));
+        assertTrue(results.contains(new KeyValue<>("d", "d")));
+        assertTrue(results.contains(new KeyValue<>("x", "x")));
+        assertTrue(results.contains(new KeyValue<>("z", "z")));
+        assertEquals(6, results.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnGet() throws Exception {
+        noStores().get("anything");
+    }
+
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnRange() throws Exception {
+        noStores().range("anything", "something");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnAll() throws Exception {
+        noStores().all();
+    }
+
+    @Test
+    public void shouldGetApproximateEntriesAcrossAllStores() throws Exception {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        assertEquals(6, theStore.approximateNumEntries());
+    }
+
+    @Test
+    public void shouldReturnLongMaxValueOnOverflow() throws Exception {
+        stubProviderTwo.addStore(storeName, new StateStoreTestUtils.NoOpReadOnlyStore<Object, Object>() {
+            @Override
+            public long approximateNumEntries() {
+                return Long.MAX_VALUE;
+            }
+        });
+
+        stubOneUnderlying.put("overflow", "me");
+        assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries());
+    }
+
+    private CompositeReadOnlyKeyValueStore<Object, Object> noStores() {
+        return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.<StateStoreProvider>emptyList()),
+                QueryableStoreTypes.keyValueStore(), storeName);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
new file mode 100644
index 0000000..825c1e8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class CompositeReadOnlyWindowStoreTest {
+
+    private final String storeName = "window-store";
+    private StateStoreProviderStub stubProviderOne;
+    private StateStoreProviderStub stubProviderTwo;
+    private CompositeReadOnlyWindowStore<String, String>
+        windowStore;
+    private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
+    private ReadOnlyWindowStoreStub<String, String>
+            otherUnderlyingStore;
+
+    @Before
+    public void before() {
+        stubProviderOne = new StateStoreProviderStub();
+        stubProviderTwo = new StateStoreProviderStub();
+        underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
+        stubProviderOne.addStore(storeName, underlyingWindowStore);
+
+        otherUnderlyingStore = new ReadOnlyWindowStoreStub<>();
+        stubProviderOne.addStore("other-window-store", otherUnderlyingStore);
+
+
+        windowStore = new CompositeReadOnlyWindowStore<>(
+            new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
+                QueryableStoreTypes.<String, String>windowStore(),
+                storeName);
+    }
+
+    @Test
+    public void shouldFetchValuesFromWindowStore() throws Exception {
+        underlyingWindowStore.put("my-key", "my-value", 0L);
+        underlyingWindowStore.put("my-key", "my-later-value", 10L);
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch("my-key", 0L, 25L);
+        final List<KeyValue<Long, String>> results = toList(iterator);
+
+        assertEquals(asList(new KeyValue<>(0L, "my-value"),
+                            new KeyValue<>(10L, "my-later-value")),
+                     results);
+    }
+
+    @Test
+    public void shouldReturnEmptyIteratorIfNoData() throws Exception {
+        final WindowStoreIterator<String> iterator = windowStore.fetch("my-key", 0L, 25L);
+        assertEquals(false, iterator.hasNext());
+    }
+
+    @Test
+    public void shouldFindValueForKeyWhenMultiStores() throws Exception {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+            ReadOnlyWindowStoreStub<>();
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+
+        underlyingWindowStore.put("key-one", "value-one", 0L);
+        secondUnderlying.put("key-two", "value-two", 10L);
+
+        final List<KeyValue<Long, String>> keyOneResults = toList(windowStore.fetch("key-one", 0L,
+                                                                                    1L));
+        final List<KeyValue<Long, String>> keyTwoResults = toList(windowStore.fetch("key-two", 10L,
+                                                                                    11L));
+
+        assertEquals(Collections.singletonList(KeyValue.pair(0L, "value-one")), keyOneResults);
+        assertEquals(Collections.singletonList(KeyValue.pair(10L, "value-two")), keyTwoResults);
+    }
+
+    @Test
+    public void shouldNotGetValuesFromOtherStores() throws Exception {
+        otherUnderlyingStore.put("some-key", "some-value", 0L);
+        underlyingWindowStore.put("some-key", "my-value", 1L);
+
+        final List<KeyValue<Long, String>> results = toList(windowStore.fetch("some-key", 0L, 2L));
+        assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
+    }
+
+    static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {
+        final List<KeyValue<K, V>> results = new ArrayList<>();
+
+        while (iterator.hasNext()) {
+            results.add(iterator.next());
+        }
+        return results;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
new file mode 100644
index 0000000..c7a1a2c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+    private final TreeMap<K, V> map = new TreeMap<>();
+    private final String name;
+    private boolean open = true;
+
+    InMemoryKeyValueStore(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        map.put(key, value);
+    }
+
+    @Override
+    public V putIfAbsent(final K key, final V value) {
+        V orig = map.get(key);
+        if (orig == null) {
+            map.put(key, value);
+        }
+        return orig;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries) {
+            map.put(entry.key, entry.value);
+        }
+    }
+
+    @Override
+    public V delete(final K key) {
+        return map.remove(key);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return map.size();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        // no-op
+    }
+
+    @Override
+    public void flush() {
+        //no-op
+    }
+
+    @Override
+    public void close() {
+        open = false;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public V get(final K key) {
+        return map.get(key);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        return new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator());
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new TheIterator(map.entrySet().iterator());
+    }
+
+    private class TheIterator implements KeyValueIterator<K, V> {
+
+        private final Iterator<Map.Entry<K, V>> underlying;
+
+        public TheIterator(final Iterator<Map.Entry<K, V>> iterator) {
+            this.underlying = iterator;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public boolean hasNext() {
+            return underlying.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            Map.Entry<K, V> next = underlying.next();
+            return new KeyValue<>(next.getKey(), next.getValue());
+        }
+
+        @Override
+        public void remove() {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index c301223..82071b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -95,7 +95,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
             assertEquals(10, driver.sizeOf(store));
             assertTrue(driver.flushedEntryRemoved(0));
             assertTrue(driver.flushedEntryRemoved(1));
-            assertTrue(driver.flushedEntryRemoved(2));
+            assertTrue(driver.flushedEntryRemoved(3));
             assertEquals(3, driver.numFlushedEntryRemoved());
         } finally {
             store.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
new file mode 100644
index 0000000..db4a913
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.streams.state.NoOpWindowStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class QueryableStoreProviderTest {
+
+    private final String keyValueStore = "key-value";
+    private final String windowStore = "window-store";
+    private QueryableStoreProvider storeProvider;
+
+    @Before
+    public void before() {
+        final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub();
+        theStoreProvider.addStore(keyValueStore, new StateStoreTestUtils.NoOpReadOnlyStore<>());
+        theStoreProvider.addStore(windowStore, new NoOpWindowStore());
+        storeProvider =
+            new QueryableStoreProvider(
+                Collections.<StateStoreProvider>singletonList(theStoreProvider));
+    }
+
+    @Test
+    public void shouldReturnNullIfKVStoreDoesntExist() throws Exception {
+        assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore()));
+    }
+
+    @Test
+    public void shouldReturnNullIfWindowStoreDoesntExist() throws Exception {
+        assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore()));
+    }
+
+    @Test
+    public void shouldReturnKVStoreWhenItExists() throws Exception {
+        assertNotNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.keyValueStore()));
+    }
+
+    @Test
+    public void shouldReturnWindowStoreWhenItExists() throws Exception {
+        assertNotNull(storeProvider.getStore(windowStore, QueryableStoreTypes.windowStore()));
+    }
+
+    @Test
+    public void shouldNotReturnKVStoreWhenIsWindowStore() throws Exception {
+        assertNull(storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore()));
+    }
+
+    @Test
+    public void shouldNotReturnWindowStoreWhenIsKVStore() throws Exception {
+        assertNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore()));
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
new file mode 100644
index 0000000..5b88eb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A very simple window store stub for testing purposes.
+ */
+public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore {
+
+    private final Map<Long, Map<K, V>> data = new HashMap<>();
+
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        final List<KeyValue<Long, V>> results = new ArrayList<>();
+        for (long now = timeFrom; now <= timeTo; now++) {
+            final Map<K, V> kvMap = data.get(now);
+            if (kvMap != null && kvMap.containsKey(key)) {
+                results.add(new KeyValue<>(now, kvMap.get(key)));
+            }
+        }
+        return new TheWindowStoreIterator<>(results.iterator());
+    }
+
+    public void put(final K key, final V value, final long timestamp) {
+        if (!data.containsKey(timestamp)) {
+            data.put(timestamp, new HashMap<K, V>());
+        }
+        data.get(timestamp).put(key, value);
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+    private class TheWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+        private final Iterator<KeyValue<Long, E>> underlying;
+
+        TheWindowStoreIterator(final Iterator<KeyValue<Long, E>> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public boolean hasNext() {
+            return underlying.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, E> next() {
+            return underlying.next();
+        }
+
+        @Override
+        public void remove() {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index d889e7b..9a6a260 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.io.File;
@@ -46,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 
 public class RocksDBWindowStoreTest {
@@ -70,6 +72,45 @@ public class RocksDBWindowStoreTest {
     }
 
     @Test
+    public void shouldOnlyIterateOpenSegments() throws Exception {
+        final File baseDir = TestUtils.tempDirectory();
+        Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
+        RecordCollector recordCollector = new RecordCollector(producer) {
+            @Override
+            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+            }
+        };
+
+        MockProcessorContext context = new MockProcessorContext(
+                null, baseDir,
+                byteArraySerde, byteArraySerde,
+                recordCollector);
+
+        final WindowStore<Integer, String> windowStore = createWindowStore(context);
+        long currentTime = 0;
+        context.setTime(currentTime);
+        windowStore.put(1, "one");
+        currentTime = currentTime + segmentSize;
+        context.setTime(currentTime);
+        windowStore.put(1, "two");
+        currentTime = currentTime + segmentSize;
+        context.setTime(currentTime);
+        windowStore.put(1, "three");
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
+
+        // roll to the next segment that will close the first
+        currentTime = currentTime + segmentSize;
+        context.setTime(currentTime);
+        windowStore.put(1, "four");
+
+        // should only have 2 values as the first segment is no longer open
+        assertEquals(new KeyValue<>(60000L, "two"), iterator.next());
+        assertEquals(new KeyValue<>(120000L, "three"), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void testPutAndFetch() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
@@ -712,7 +753,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(2, fetchedCount);
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)),
+                        Utils.mkSet(inner.segmentName(1L), inner.segmentName(3L)),
                         segmentDirs(baseDir)
                 );
 
@@ -728,7 +769,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(1, fetchedCount);
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)),
+                        Utils.mkSet(inner.segmentName(3L), inner.segmentName(5L)),
                         segmentDirs(baseDir)
                 );
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
new file mode 100644
index 0000000..7cf736d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreProviderStub.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StateStoreProviderStub implements StateStoreProvider {
+
+    private final Map<String, StateStore> stores = new HashMap<>();
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> List<T> getStores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) {
+            return (List<T>) Collections.singletonList(stores.get(storeName));
+        }
+        return Collections.emptyList();
+    }
+
+    public void addStore(final String storeName,
+                         final StateStore store) {
+        stores.put(storeName, store);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
new file mode 100644
index 0000000..e30c7ae
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
+
+@SuppressWarnings("unchecked")
+public class StateStoreTestUtils {
+
+    public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) {
+        final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
+                null, null, new MockTime());
+
+        final StateStore stateStore = supplier.get();
+        stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),
+                new NoOpRecordCollector()), stateStore);
+        return (KeyValueStore<K, V>) stateStore;
+
+    }
+
+    static class NoOpRecordCollector extends RecordCollector {
+        public NoOpRecordCollector() {
+            super(null);
+        }
+
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+            // no-op
+        }
+
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+            // no-op
+        }
+
+        @Override
+        public void flush() {
+            //no-op
+        }
+
+        @Override
+        public void close() {
+            //no-op
+        }
+    }
+
+    static class NoOpReadOnlyStore<K, V>
+            implements ReadOnlyKeyValueStore<K, V>, StateStore {
+
+        @Override
+        public V get(final K key) {
+            return null;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from, final K to) {
+            return null;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return null;
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return 0L;
+        }
+
+        @Override
+        public String name() {
+            return "";
+        }
+
+        @Override
+        public void init(final ProcessorContext context, final StateStore root) {
+
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public boolean isOpen() {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
new file mode 100644
index 0000000..5ac57f8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
+import static org.junit.Assert.assertEquals;
+
+public class StreamThreadStateStoreProviderTest {
+
+    private StreamThread thread;
+    private StreamTask taskOne;
+    private StreamTask taskTwo;
+    private StreamThreadStateStoreProvider provider;
+
+    @Before
+    public void before() throws IOException {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("the-source");
+        builder.addProcessor("the-processor", new MockProcessorSupplier());
+        builder.addStateStore(Stores.create("kv-store")
+                                  .withStringKeys()
+                                  .withStringValues().inMemory().build(), "the-processor");
+
+        builder.addStateStore(Stores.create("window-store")
+                                  .withStringKeys()
+                                  .withStringValues()
+                                  .persistent()
+                                  .windowed(10, 2, false).build(), "the-processor");
+
+        final Properties properties = new Properties();
+        final String applicationId = "applicationId";
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.put(StreamsConfig.STATE_DIR_CONFIG,
+                       TestUtils.tempDirectory(new File("/tmp").toPath(), "my-state").getPath());
+
+        final StreamsConfig streamsConfig = new StreamsConfig(properties);
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
+        configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
+
+        final ProcessorTopology topology = builder.build("X", null);
+        final Map<TaskId, StreamTask> tasks = new HashMap<>();
+        taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
+                                    new TaskId(0, 0));
+        tasks.put(new TaskId(0, 0),
+                  taskOne);
+        taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
+                                    new TaskId(0, 1));
+        tasks.put(new TaskId(0, 1),
+                  taskTwo);
+
+        thread = new StreamThread(builder, streamsConfig, clientSupplier,
+                                  applicationId,
+                                  "clientId", UUID.randomUUID(), new Metrics(),
+                                  new SystemTime()) {
+            @Override
+            public Map<TaskId, StreamTask> tasks() {
+                return tasks;
+            }
+        };
+        provider = new StreamThreadStateStoreProvider(thread);
+    }
+
+    @Test
+    public void shouldFindKeyValueStores() throws Exception {
+        List<ReadOnlyKeyValueStore<String, String>> kvStores =
+            provider.getStores("kv-store", QueryableStoreTypes.<String, String>keyValueStore());
+        assertEquals(2, kvStores.size());
+    }
+
+    @Test
+    public void shouldFindWindowStores() throws Exception {
+        final List<ReadOnlyWindowStore<Object, Object>>
+            windowStores =
+            provider.getStores("window-store", windowStore());
+        assertEquals(2, windowStores.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() throws Exception {
+        taskOne.getStore("window-store").close();
+        provider.getStores("window-store", QueryableStoreTypes.windowStore());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() throws Exception {
+        taskOne.getStore("kv-store").close();
+        provider.getStores("kv-store", QueryableStoreTypes.keyValueStore());
+    }
+
+    @Test
+    public void shouldReturnEmptyListIfNoStoresFoundWithName() throws Exception {
+        assertEquals(Collections.emptyList(), provider.getStores("not-a-store", QueryableStoreTypes
+            .keyValueStore()));
+    }
+
+
+    @Test
+    public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() throws Exception {
+        assertEquals(Collections.emptyList(), provider.getStores("window-store",
+                                                                 QueryableStoreTypes.keyValueStore()));
+    }
+
+    private StreamTask createStreamsTask(final String applicationId,
+                                         final StreamsConfig streamsConfig,
+                                         final MockClientSupplier clientSupplier,
+                                         final ProcessorTopology topology,
+                                         final TaskId taskId) {
+        return new StreamTask(taskId, applicationId, Collections
+            .singletonList(new TopicPartition("topic", taskId.partition)), topology,
+                              clientSupplier.consumer,
+                              clientSupplier.producer,
+                              clientSupplier.restoreConsumer,
+                              streamsConfig, new TheStreamMetrics()) {
+            @Override
+            protected void initializeOffsetLimits() {
+
+            }
+        };
+    }
+
+    private void configureRestoreConsumer(final MockClientSupplier clientSupplier,
+                                          final String topic) {
+        clientSupplier.restoreConsumer
+            .updatePartitions(topic,
+                              Arrays.asList(
+                                  new PartitionInfo(topic, 0, null,
+                                                    null, null),
+                                  new PartitionInfo(topic, 1, null,
+                                                    null, null)));
+        final TopicPartition tp1 = new TopicPartition(topic, 0);
+        final TopicPartition tp2 = new TopicPartition(topic, 1);
+
+        clientSupplier.restoreConsumer
+            .assign(Arrays.asList(
+                tp1,
+                tp2));
+
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(tp1, 0L);
+        offsets.put(tp2, 0L);
+
+        clientSupplier.restoreConsumer
+            .updateBeginningOffsets(offsets);
+        clientSupplier.restoreConsumer
+            .updateEndOffsets(offsets);
+    }
+
+    private static class TheStreamMetrics implements StreamsMetrics {
+
+        @Override
+        public Sensor addLatencySensor(final String scopeName,
+                                       final String entityName,
+                                       final String operationName,
+                                       final String... tags) {
+            return null;
+        }
+
+        @Override
+        public void recordLatency(final Sensor sensor, final long startNs,
+                                  final long endNs) {
+
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
new file mode 100644
index 0000000..32aae6b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.streams.state.NoOpWindowStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
+import static org.junit.Assert.assertEquals;
+
+public class WrappingStoreProviderTest {
+
+    private WrappingStoreProvider wrappingStoreProvider;
+
+    @Before
+    public void before() {
+        final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
+        final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub();
+
+
+        stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderOne.addStore("window", new NoOpWindowStore());
+        stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
+        stubProviderTwo.addStore("window", new NoOpWindowStore());
+
+        wrappingStoreProvider = new WrappingStoreProvider(
+                Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo));
+    }
+
+    @Test
+    public void shouldFindKeyValueStores() throws Exception {
+        List<ReadOnlyKeyValueStore<String, String>> results =
+                wrappingStoreProvider.getStores("kv", QueryableStoreTypes.<String, String>keyValueStore());
+        assertEquals(2, results.size());
+    }
+
+    @Test
+    public void shouldFindWindowStores() throws Exception {
+        final List<ReadOnlyWindowStore<Object, Object>>
+                windowStores =
+                wrappingStoreProvider.getStores("window", windowStore());
+        assertEquals(2, windowStores.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() throws Exception {
+        wrappingStoreProvider.getStores("doesn't exist", QueryableStoreTypes.keyValueStore());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 7b31477..f24dfda 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -101,6 +101,11 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
             return persistent;
         }
 
+        @Override
+        public boolean isOpen() {
+            return !closed;
+        }
+
         public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
             private final Deserializer<Integer> deserializer = new IntegerDeserializer();
 


Mime
View raw message