kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-3912: Query local state stores
Date Tue, 19 Jul 2016 21:02:28 GMT
KAFKA-3912: Query local state stores

guozhangwang enothereska please review

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Michael G. Noll, Guozhang Wang

Closes #1565 from dguy/kafka-3912


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1dd0d27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1dd0d27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1dd0d27

Branch: refs/heads/trunk
Commit: f1dd0d272313deceaf7021e54c09a63043acf4af
Parents: b418922
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 19 14:02:21 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 19 14:02:21 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  31 +++
 .../kafka/streams/processor/StateStore.java     |   6 +
 .../processor/internals/AbstractTask.java       |   3 +
 .../processor/internals/StreamThread.java       |   5 +-
 .../kafka/streams/state/KeyValueStore.java      |  38 +--
 .../kafka/streams/state/QueryableStoreType.java |  48 ++++
 .../streams/state/QueryableStoreTypes.java      |  90 +++++++
 .../streams/state/ReadOnlyKeyValueStore.java    |  66 +++++
 .../streams/state/ReadOnlyWindowStore.java      |  36 +++
 .../apache/kafka/streams/state/WindowStore.java |   9 +-
 .../CompositeReadOnlyKeyValueStore.java         | 145 +++++++++++
 .../internals/CompositeReadOnlyWindowStore.java |  74 ++++++
 .../internals/InMemoryKeyValueLoggedStore.java  |   5 +
 .../InMemoryKeyValueStoreSupplier.java          |  26 +-
 .../internals/InvalidStateStoreException.java   |  27 ++
 .../streams/state/internals/MemoryLRUCache.java |  34 +--
 .../internals/MemoryNavigableLRUCache.java      |  42 +--
 .../state/internals/MeteredKeyValueStore.java   |   5 +
 .../state/internals/MeteredWindowStore.java     |   5 +
 .../state/internals/QueryableStoreProvider.java |  54 ++++
 .../streams/state/internals/RocksDBStore.java   |  48 +++-
 .../state/internals/RocksDBWindowStore.java     | 135 ++++++----
 .../state/internals/StateStoreProvider.java     |  41 +++
 .../StreamThreadStateStoreProvider.java         |  53 ++++
 .../state/internals/WindowStoreUtils.java       |   3 -
 .../state/internals/WrappingStoreProvider.java  |  55 ++++
 .../QueryableStateIntegrationTest.java          | 256 +++++++++++++++++++
 .../kafka/streams/state/NoOpWindowStore.java    |  58 +++++
 .../CompositeReadOnlyKeyValueStoreTest.java     | 199 ++++++++++++++
 .../CompositeReadOnlyWindowStoreTest.java       | 114 +++++++++
 .../state/internals/InMemoryKeyValueStore.java  | 146 +++++++++++
 .../internals/InMemoryLRUCacheStoreTest.java    |   2 +-
 .../internals/QueryableStoreProviderTest.java   |  75 ++++++
 .../internals/ReadOnlyWindowStoreStub.java      | 113 ++++++++
 .../state/internals/RocksDBWindowStoreTest.java |  45 +++-
 .../state/internals/StateStoreProviderStub.java |  43 ++++
 .../state/internals/StateStoreTestUtils.java    | 125 +++++++++
 .../StreamThreadStateStoreProviderTest.java     | 211 +++++++++++++++
 .../internals/WrappingStoreProviderTest.java    |  71 +++++
 .../kafka/test/MockStateStoreSupplier.java      |   5 +
 40 files changed, 2380 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6605335..0ed0b6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -24,12 +24,18 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
+import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -90,6 +96,7 @@ public class KafkaStreams {
 
     private final StreamThread[] threads;
     private final Metrics metrics;
+    private final QueryableStoreProvider queryableStoreProvider;
 
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
@@ -151,9 +158,13 @@ public class KafkaStreams {
         this.metrics = new Metrics(metricConfig, reporters, time);
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         for (int i = 0; i < this.threads.length; i++) {
             this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
+
+        this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
     }
 
     /**
@@ -217,4 +228,24 @@ public class KafkaStreams {
             thread.setUncaughtExceptionHandler(eh);
     }
 
+    /**
+     * Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
+     * with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}.
+     * The returned object can be used to query the {@link org.apache.kafka.streams.processor.StateStore} instances
+     * @param storeName             name of the store to find
+     * @param queryableStoreType    accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
+     * @param <T>                   return type
+     * @return  A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
+     */
+    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        validateIsRunning();
+        return queryableStoreProvider.getStore(storeName, queryableStoreType);
+    }
+
+    private void validateIsRunning() {
+        if (state != RUNNING) {
+            throw new IllegalStateException("KafkaStreams is not running");
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 68f3644..9aa0932 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -58,4 +58,10 @@ public interface StateStore {
      * @return  {@code true} if the storage is persistent&mdash;{@code false} otherwise
      */
     boolean persistent();
+
+    /**
+     * Is this store open for reading and writing
+     * @return {@code true} if the store is open
+     */
+    boolean isOpen();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index c85ecde..fb22c8a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -126,4 +126,7 @@ public abstract class AbstractTask {
         }
     }
 
+    public StateStore getStore(final String name) {
+        return stateMgr.getStore(name);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d1ce40f..c18e1cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -180,7 +181,9 @@ public class StreamThread extends Thread {
                 config.getRestoreConsumerConfigs(threadClientId));
 
         // initialize the task list
-        this.activeTasks = new HashMap<>();
+        // activeTasks needs to be concurrent as it can be accessed
+        // by QueryableState
+        this.activeTasks = new ConcurrentHashMap<>();
         this.standbyTasks = new HashMap<>();
         this.activeTasksByPartition = new HashMap<>();
         this.standbyTasksByPartition = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 1ee790d..2a86049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -32,16 +32,7 @@ import java.util.List;
  * @param <V> The value type
  */
 @InterfaceStability.Unstable
-public interface KeyValueStore<K, V> extends StateStore {
-
-    /**
-     * Get the value corresponding to this key
-     *
-     * @param key The key to fetch
-     * @return The value or null if no value is found.
-     * @throws NullPointerException If null is used for key.
-     */
-    V get(K key);
+public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
 
     /**
      * Update the value associated with this key
@@ -80,31 +71,4 @@ public interface KeyValueStore<K, V> extends StateStore {
      */
     V delete(K key);
 
-    /**
-     * Get an iterator over a given range of keys. This iterator MUST be closed after use.
-     *
-     * @param from The first key that could be in the range
-     * @param to The last key that could be in the range
-     * @return The iterator for this range.
-     * @throws NullPointerException If null is used for from or to.
-     */
-    KeyValueIterator<K, V> range(K from, K to);
-
-    /**
-     * Return an iterator over all keys in the database. This iterator MUST be closed after use.
-     *
-     * @return An iterator of all key/value pairs in the store.
-     */
-    KeyValueIterator<K, V> all();
-
-    /**
-     * Return an approximate count of key-value mappings in this store.
-     *
-     * The count is not guaranteed to be exact in order to accommodate stores
-     * where an exact count is expensive to calculate.
-     *
-     * @return an approximate count of key-value mappings in the store.
-     */
-    long approximateNumEntries();
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
new file mode 100644
index 0000000..0a8521b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
+
+/**
+ * Used to enable querying of custom {@link StateStore} types via the
+ * {@link org.apache.kafka.streams.KafkaStreams}
+ * API.
+ * @see QueryableStoreTypes
+ *
+ * @param <T>   The store type
+ */
+@InterfaceStability.Unstable
+public interface QueryableStoreType<T> {
+
+    /**
+     * Called when searching for {@link StateStore}s to see if they
+     * match the type expected by implementors of this interface
+     * @param stateStore    The stateStore
+     * @return true if it is a match
+     */
+    boolean accepts(final StateStore stateStore);
+
+    /**
+     * Create an instance of T (usually a facade) that developers can use
+     * to query the underlying {@link StateStore}s
+     * @param storeProvider     provides access to all the underlying StateStore instances
+     * @param storeName         The name of the Store
+     * @return  T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType}
+     */
+    T create(final StateStoreProvider storeProvider, final String storeName);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
new file mode 100644
index 0000000..d57fe35
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
+
+/**
+ * Provides access to the {@link QueryableStoreType}s provided with KafkaStreams. These
+ * can be used with {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType)}
+ * To access and query the {@link StateStore}s that are part of a Topology
+ */
+public class QueryableStoreTypes {
+
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
+     * @param <K>   key type of the store
+     * @param <V>   value type of the store
+     * @return  {@link KeyValueStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
+        return new KeyValueStoreType<>();
+    }
+
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
+     * @param <K>   key type of the store
+     * @param <V>   value type of the store
+     * @return  {@link WindowStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
+        return new WindowStoreType<>();
+    }
+
+    private static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
+
+        private final Class matchTo;
+
+        QueryableStoreTypeMatcher(Class matchTo) {
+            this.matchTo = matchTo;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public boolean accepts(final StateStore stateStore) {
+            return matchTo.isAssignableFrom(stateStore.getClass());
+        }
+    }
+
+    private static class KeyValueStoreType<K, V> extends
+                                                 QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+        KeyValueStoreType() {
+            super(ReadOnlyKeyValueStore.class);
+        }
+
+        @Override
+        public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider,
+                                                  final String storeName) {
+            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
+        }
+
+    }
+
+    private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+        WindowStoreType() {
+            super(ReadOnlyWindowStore.class);
+        }
+
+        @Override
+        public ReadOnlyWindowStore<K, V> create(final StateStoreProvider storeProvider,
+                                                final String storeName) {
+            return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
new file mode 100644
index 0000000..0799fae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A key value store that only supports read operations.
+ * Implementations should be thread-safe as concurrent reads and writes
+ * are expected
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+@InterfaceStability.Unstable
+public interface ReadOnlyKeyValueStore<K, V> {
+
+    /**
+     * Get the value corresponding to this key
+     *
+     * @param key The key to fetch
+     * @return The value or null if no value is found.
+     * @throws NullPointerException If null is used for key.
+     */
+    V get(K key);
+
+    /**
+     * Get an iterator over a given range of keys. This iterator MUST be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values. No ordering guarantees are provided.
+     * @param from The first key that could be in the range
+     * @param to The last key that could be in the range
+     * @return The iterator for this range.
+     * @throws NullPointerException If null is used for from or to.
+     */
+    KeyValueIterator<K, V> range(K from, K to);
+
+    /**
+     * Return an iterator over all keys in this store. This iterator MUST be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values. No ordering guarantees are provided.
+     * @return An iterator of all key/value pairs in the store.
+     */
+    KeyValueIterator<K, V> all();
+
+    /**
+     * Return an approximate count of key-value mappings in this store.
+     *
+     * The count is not guaranteed to be exact in order to accommodate stores
+     * where an exact count is expensive to calculate.
+     *
+     * @return an approximate count of key-value mappings in the store.
+     */
+    long approximateNumEntries();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
new file mode 100644
index 0000000..347a355
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A window store that only supports read operations
+ * Implementations should be thread-safe as concurrent reads and writes
+ * are expected.
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
+@InterfaceStability.Unstable
+public interface ReadOnlyWindowStore<K, V> {
+
+    /**
+     * Get all the key-value pairs with the given key and the time range from all
+     * the existing windows.
+     *
+     * @return an iterator over key-value pairs {@code <timestamp, value>}
+     */
+    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 079a2b2..dee3e9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StateStore;
  * @param <V> Type of values
  */
 @InterfaceStability.Unstable
-public interface WindowStore<K, V> extends StateStore {
+public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
 
     /**
      * Put a key-value pair with the current wall-clock time as the timestamp
@@ -42,11 +42,4 @@ public interface WindowStore<K, V> extends StateStore {
      */
     void put(K key, V value, long timestamp);
 
-    /**
-     * Get all the key-value pairs with the given key and the time range from all
-     * the existing windows.
-     *
-     * @return an iterator over key-value pairs {@code <timestamp, value>}
-     */
-    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
new file mode 100644
index 0000000..08b8746
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link
+ * org.apache.kafka.streams.processor.internals.ProcessorTopology}
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
+
+    private final StateStoreProvider storeProvider;
+    private final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType;
+    private final String storeName;
+
+    public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider,
+                                          final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
+                                          final String storeName) {
+        this.storeProvider = storeProvider;
+        this.storeType = storeType;
+        this.storeName = storeName;
+    }
+
+    @Override
+    public V get(final K key) {
+        final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
+        for (ReadOnlyKeyValueStore<K, V> store : stores) {
+            V result = store.get(key);
+            if (result != null) {
+                return result;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
+            @Override
+            public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
+                return store.range(from, to);
+            }
+        };
+        final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
+        return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
+            @Override
+            public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
+                return store.all();
+            }
+        };
+        final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
+        return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
+        long total = 0;
+        for (ReadOnlyKeyValueStore<K, V> store : stores) {
+            total += store.approximateNumEntries();
+        }
+        return total < 0 ? Long.MAX_VALUE : total;
+    }
+
+    interface NextIteratorFunction<K, V> {
+
+        KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store);
+    }
+
+
+    private class CompositeKeyValueIterator implements KeyValueIterator<K, V> {
+
+        private final Iterator<ReadOnlyKeyValueStore<K, V>> storeIterator;
+        private final NextIteratorFunction<K, V> nextIteratorFunction;
+
+        private KeyValueIterator<K, V> current;
+
+        CompositeKeyValueIterator(final Iterator<ReadOnlyKeyValueStore<K, V>> underlying,
+                                  final NextIteratorFunction<K, V> nextIteratorFunction) {
+            this.storeIterator = underlying;
+            this.nextIteratorFunction = nextIteratorFunction;
+        }
+
+        @Override
+        public void close() {
+            if (current != null) {
+                current.close();
+                current = null;
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            while ((current == null || !current.hasNext())
+                    && storeIterator.hasNext()) {
+                close();
+                current = nextIteratorFunction.apply(storeIterator.next());
+            }
+            return current != null && current.hasNext();
+        }
+
+
+        @Override
+        public KeyValue<K, V> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return current.next();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Remove not supported");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
new file mode 100644
index 0000000..9a7ea7a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
+ * org.apache.kafka.streams.processor.internals.ProcessorTopology}
+ */
+public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
+
+    private final QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType;
+    private final String storeName;
+    private final StateStoreProvider provider;
+
+    public CompositeReadOnlyWindowStore(final StateStoreProvider provider,
+                                        final QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType,
+                                        final String storeName) {
+        this.provider = provider;
+        this.windowStoreType = windowStoreType;
+        this.storeName = storeName;
+    }
+
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        final List<ReadOnlyWindowStore<K, V>> stores = provider.getStores(storeName, windowStoreType);
+        for (ReadOnlyWindowStore<K, V> windowStore : stores) {
+            final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
+            if (!result.hasNext()) {
+                result.close();
+            } else {
+                return result;
+            }
+        }
+        return new WindowStoreIterator<V>() {
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public boolean hasNext() {
+                return false;
+            }
+
+            @Override
+            public KeyValue<Long, V> next() {
+                throw new NoSuchElementException();
+            }
+
+            @Override
+            public void remove() {
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index c00bf76..4f056ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -85,6 +85,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+
+    @Override
     public V get(K key) {
         return this.inner.get(key);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 0cc4586..3953fd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -79,6 +79,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         private final Serde<K> keySerde;
         private final Serde<V> valueSerde;
         private final NavigableMap<K, V> map;
+        private volatile boolean open = false;
 
         private StateSerdes<K, V> serdes;
 
@@ -121,6 +122,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
                     }
                 }
             });
+            this.open = true;
         }
 
         @Override
@@ -129,17 +131,22 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public V get(K key) {
+        public boolean isOpen() {
+            return this.open;
+        }
+
+        @Override
+        public synchronized V get(K key) {
             return this.map.get(key);
         }
 
         @Override
-        public void put(K key, V value) {
+        public synchronized void put(K key, V value) {
             this.map.put(key, value);
         }
 
         @Override
-        public V putIfAbsent(K key, V value) {
+        public synchronized V putIfAbsent(K key, V value) {
             V originalValue = get(key);
             if (originalValue == null) {
                 put(key, value);
@@ -148,24 +155,25 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void putAll(List<KeyValue<K, V>> entries) {
+        public synchronized void putAll(List<KeyValue<K, V>> entries) {
             for (KeyValue<K, V> entry : entries)
                 put(entry.key, entry.value);
         }
 
         @Override
-        public V delete(K key) {
+        public synchronized V delete(K key) {
             return this.map.remove(key);
         }
 
         @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
+        public synchronized KeyValueIterator<K, V> range(K from, K to) {
             return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator());
         }
 
         @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<>(this.map.entrySet().iterator());
+        public synchronized KeyValueIterator<K, V> all() {
+            final TreeMap<K, V> copy = new TreeMap<>(this.map);
+            return new MemoryStoreIterator<>(copy.entrySet().iterator());
         }
 
         @Override
@@ -180,7 +188,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
         @Override
         public void close() {
-            // do-nothing
+            this.open = false;
         }
 
         private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
new file mode 100644
index 0000000..b88704f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+/**
+ * Indicates that there was a problem when trying to access
+ * a {@link org.apache.kafka.streams.processor.StateStore}, i.e, the Store is no longer valid because it is closed
+ * or doesn't exist any more due to a rebalance.
+ */
+public class InvalidStateStoreException extends RuntimeException {
+
+    public InvalidStateStoreException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 083f811..4e1f40e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -25,11 +25,9 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * An in-memory LRU cache store based on HashSet and HashMap.
@@ -47,16 +45,16 @@ import java.util.Set;
 public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     public interface EldestEntryRemovalListener<K, V> {
+
         void apply(K key, V value);
     }
-
     private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
 
-    protected String name;
+    private final Serde<V> valueSerde;
+    private String name;
     protected Map<K, V> map;
-    protected Set<K> keys;
     private StateSerdes<K, V> serdes;
+    private volatile boolean open = true;
 
     protected EldestEntryRemovalListener<K, V> listener;
 
@@ -68,9 +66,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
         this(keySerde, valueSerde);
-
         this.name = name;
-        this.keys = new HashSet<>();
 
         // leave room for one extra entry to handle adding an entry before the oldest can be removed
         this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
@@ -80,7 +76,6 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                 if (size() > maxCacheSize) {
                     K key = eldest.getKey();
-                    keys.remove(key);
                     if (listener != null) listener.apply(key, eldest.getValue());
                     return true;
                 }
@@ -132,18 +127,22 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public V get(K key) {
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized V get(K key) {
         return this.map.get(key);
     }
 
     @Override
-    public void put(K key, V value) {
+    public synchronized void put(K key, V value) {
         this.map.put(key, value);
-        this.keys.add(key);
     }
 
     @Override
-    public V putIfAbsent(K key, V value) {
+    public synchronized V putIfAbsent(K key, V value) {
         V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -158,9 +157,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public V delete(K key) {
+    public synchronized V delete(K key) {
         V value = this.map.remove(key);
-        this.keys.remove(key);
         return value;
     }
 
@@ -192,6 +190,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void close() {
-        // do-nothing since it is in-memory
+        open = false;
+    }
+
+    public int size() {
+        return this.map.size();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 5eb4f49..b79ad7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -21,34 +21,14 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
+import java.util.TreeMap;
 
 public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
 
+
     public MemoryNavigableLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
-        super(keySerde, valueSerde);
-
-        this.name = name;
-        this.keys = new TreeSet<>();
-
-        // leave room for one extra entry to handle adding an entry before the oldest can be removed
-        this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                if (size() > maxCacheSize) {
-                    K key = eldest.getKey();
-                    keys.remove(key);
-                    if (listener != null) listener.apply(key, eldest.getValue());
-                    return true;
-                }
-                return false;
-            }
-        };
+        super(name, maxCacheSize, keySerde, valueSerde);
     }
 
     @Override
@@ -60,14 +40,21 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
 
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
-        return new MemoryNavigableLRUCache.CacheIterator<>(((NavigableSet<K>) this.keys).subSet(from, true, to, false).iterator(), this.map);
+        final TreeMap<K, V> treeMap = toTreeMap();
+        return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap);
     }
 
     @Override
-    public KeyValueIterator<K, V> all() {
-        return new MemoryNavigableLRUCache.CacheIterator<>(this.keys.iterator(), this.map);
+    public  KeyValueIterator<K, V> all() {
+        final TreeMap<K, V> treeMap = toTreeMap();
+        return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().iterator(), treeMap);
+    }
+
+    private synchronized TreeMap<K, V> toTreeMap() {
+        return new TreeMap<>(this.map);
     }
 
+
     private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
         private final Iterator<K> keys;
         private final Map<K, V> entries;
@@ -91,8 +78,7 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
 
         @Override
         public void remove() {
-            keys.remove();
-            entries.remove(lastKey);
+            // do nothing
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index c6e93cb..798c097 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -94,6 +94,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+
+    @Override
     public V get(K key) {
         long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 37ae499..09952a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -75,6 +75,11 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
+    public boolean isOpen() {
+        return inner.isOpen();
+    }
+
+    @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
         return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
new file mode 100644
index 0000000..c77e175
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A wrapper over all of the {@link StateStoreProvider}s in a Topology
+ */
+public class QueryableStoreProvider {
+
+    private final List<StateStoreProvider> storeProviders;
+
+    public QueryableStoreProvider(final List<StateStoreProvider> storeProviders) {
+        this.storeProviders = new ArrayList<>(storeProviders);
+    }
+
+    /**
+     * Get a composite object wrapping the instances of the {@link StateStore} with the provided
+     * storeName and {@link QueryableStoreType}
+     * @param storeName             name of the store
+     * @param queryableStoreType    accept stores passing {@link QueryableStoreType#accepts(StateStore)}
+     * @param <T>                   The expected type of the returned store
+     * @return A composite object that wraps the store instances.
+     */
+    public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        final List<T> allStores = new ArrayList<>();
+        for (StateStoreProvider storeProvider : storeProviders) {
+            allStores.addAll(storeProvider.getStores(storeName, queryableStoreType));
+        }
+        if (allStores.isEmpty()) {
+            return null;
+        }
+        return queryableStoreType.create(
+                new WrappingStoreProvider(storeProviders),
+                storeName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index baf5b9e..a8badcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -97,6 +97,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
+    private volatile boolean open = false;
+
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
 
@@ -164,6 +166,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+        open = true;
     }
 
     public void init(ProcessorContext context, StateStore root) {
@@ -234,7 +237,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public V get(K key) {
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized V get(K key) {
+        validateStoreOpen();
         if (cache != null) {
             RocksDBCacheEntry entry = cache.get(key);
             if (entry == null) {
@@ -258,6 +267,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 return serdes.valueFrom(byteValue);
             }
         }
+
+    }
+
+    private void validateStoreOpen() {
+        if (!open) {
+            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
+        }
     }
 
     private byte[] getInternal(byte[] rawKey) {
@@ -270,7 +286,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void put(K key, V value) {
+    public synchronized void put(K key, V value) {
+        validateStoreOpen();
         if (cache != null) {
             cacheDirtyKeys.add(key);
             cache.put(key, new RocksDBCacheEntry(value, true));
@@ -284,10 +301,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 changeLogger.maybeLogChange(this.getter);
             }
         }
+
     }
 
     @Override
-    public V putIfAbsent(K key, V value) {
+    public synchronized V putIfAbsent(K key, V value) {
         V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -337,14 +355,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public V delete(K key) {
+    public synchronized V delete(K key) {
         V value = get(key);
         put(key, null);
         return value;
     }
 
     @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
+    public synchronized KeyValueIterator<K, V> range(K from, K to) {
+        validateStoreOpen();
         // we need to flush the cache if necessary before returning the iterator
         if (cache != null)
             flushCache();
@@ -353,7 +372,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public KeyValueIterator<K, V> all() {
+    public synchronized KeyValueIterator<K, V> all() {
+        validateStoreOpen();
         // we need to flush the cache if necessary before returning the iterator
         if (cache != null)
             flushCache();
@@ -403,8 +423,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private void flushCache() {
         // flush of the cache entries if necessary
         if (cache != null) {
-            List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
-            List<byte[]> deleteBatch = new ArrayList<>(cache.keys.size());
+            List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.size());
+            List<byte[]> deleteBatch = new ArrayList<>(cache.size());
 
             for (K key : cacheDirtyKeys) {
                 RocksDBCacheEntry entry = cache.get(key);
@@ -449,10 +469,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         if (loggingEnabled)
             changeLogger.logChange(getter);
+
     }
 
     @Override
-    public void flush() {
+    public synchronized void flush() {
         if (db == null) {
             return;
         }
@@ -467,7 +488,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     /**
      * @throws ProcessorStateException if flushing failed because of any internal store exceptions
      */
-    public void flushInternal() {
+    private void flushInternal() {
         try {
             db.flush(fOptions);
         } catch (RocksDBException e) {
@@ -476,12 +497,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void close() {
-
-        if (db == null) {
+    public synchronized void close() {
+        if (!open) {
             return;
         }
-
+        open = false;
         flush();
         options.dispose();
         wOptions.dispose();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 803a089..528b905 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -35,11 +35,16 @@ import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.SimpleTimeZone;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
@@ -47,6 +52,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     private static final long USE_CURRENT_TIMESTAMP = -1L;
 
+    private volatile boolean open = false;
+
     // use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
     private static class Segment extends RocksDBStore<Bytes, byte[]> {
         public final long id;
@@ -63,27 +70,36 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
         private final StateSerdes<?, V> serdes;
-        private final KeyValueIterator<Bytes, byte[]>[] iterators;
-        private int index = 0;
+        private final Iterator<Segment> segments;
+        private final Bytes from;
+        private final Bytes to;
+        private KeyValueIterator<Bytes, byte[]> currentIterator;
+        private Segment currentSegment;
 
         RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
-            this(serdes, WindowStoreUtils.NO_ITERATORS);
+            this(serdes, null, null, Collections.<Segment>emptyIterator());
         }
 
-        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<Bytes, byte[]>[] iterators) {
+        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, Bytes from, Bytes to, Iterator<Segment> segments) {
             this.serdes = serdes;
-            this.iterators = iterators;
+            this.from = from;
+            this.to = to;
+            this.segments = segments;
         }
 
         @Override
         public boolean hasNext() {
-            while (index < iterators.length) {
-                if (iterators[index].hasNext())
-                    return true;
-
-                index++;
+            while ((currentIterator == null || !currentIterator.hasNext() || !currentSegment.isOpen())
+                    && segments.hasNext()) {
+                close();
+                currentSegment = segments.next();
+                try {
+                    currentIterator = currentSegment.range(from, to);
+                } catch (InvalidStateStoreException e) {
+                    // segment may have been closed so we ignore it.
+                }
             }
-            return false;
+            return currentIterator != null && currentIterator.hasNext();
         }
 
         /**
@@ -91,37 +107,37 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
          */
         @Override
         public KeyValue<Long, V> next() {
-            if (index >= iterators.length)
+            if (!hasNext()) {
                 throw new NoSuchElementException();
-
-            KeyValue<Bytes, byte[]> kv = iterators[index].next();
-
+            }
+            KeyValue<Bytes, byte[]> kv = currentIterator.next();
             return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()),
-                                  serdes.valueFrom(kv.value));
+                    serdes.valueFrom(kv.value));
         }
 
         @Override
         public void remove() {
-            if (index < iterators.length)
-                iterators[index].remove();
+
         }
 
         @Override
         public void close() {
-            for (KeyValueIterator<Bytes, byte[]> iterator : iterators) {
-                iterator.close();
+            if (currentIterator != null) {
+                currentIterator.close();
+                currentIterator = null;
             }
         }
     }
 
     private final String name;
+    private final int numSegments;
     private final long segmentInterval;
     private final boolean retainDuplicates;
-    private final Segment[] segments;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final SimpleDateFormat formatter;
     private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
+    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
 
     private ProcessorContext context;
     private int seqnum = 0;
@@ -134,11 +150,11 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
         this.name = name;
+        this.numSegments = numSegments;
 
         // The segment interval must be greater than MIN_SEGMENT_INTERVAL
         this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
 
-        this.segments = new Segment[numSegments];
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
 
@@ -192,6 +208,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         });
 
         flush();
+        open = true;
     }
 
     private void openExistingSegments() {
@@ -210,7 +227,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                     for (long segmentId : segmentIds) {
                         if (segmentId >= 0) {
                             currentSegmentId = segmentId;
-                            getSegment(segmentId);
+                            getOrCreateSegment(segmentId);
                         }
                     }
                 }
@@ -228,8 +245,13 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
     public void flush() {
-        for (Segment segment : segments) {
+        for (Segment segment : segments.values()) {
             if (segment != null)
                 segment.flush();
         }
@@ -240,8 +262,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void close() {
+        open = false;
         flush();
-        for (Segment segment : segments) {
+        for (Segment segment : segments.values()) {
             if (segment != null)
                 segment.close();
         }
@@ -279,7 +302,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        Segment segment = getSegment(segmentId);
+        Segment segment = getOrCreateSegment(segmentId);
         if (segment != null) {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
@@ -301,7 +324,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        Segment segment = getSegment(segmentId);
+        Segment segment = getOrCreateSegment(segmentId);
         if (segment != null)
             segment.put(Bytes.wrap(binaryKey), binaryValue);
     }
@@ -320,41 +343,56 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     @SuppressWarnings("unchecked")
     @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+        if (!isOpen()) {
+            throw new InvalidStateStoreException("Store " + this.name + " is currently not isOpen");
+        }
         long segFrom = segmentId(timeFrom);
         long segTo = segmentId(Math.max(0L, timeTo));
 
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
 
-        ArrayList<KeyValueIterator<Bytes, byte[]>> iterators = new ArrayList<>();
-
+        final List<Segment> segments = new ArrayList<>();
         for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
             Segment segment = getSegment(segmentId);
-            if (segment != null)
-                iterators.add(segment.range(Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo)));
+            if (segment != null && segment.isOpen()) {
+                try {
+                    segments.add(segment);
+                } catch (InvalidStateStoreException ise) {
+                    // segment may have been closed by streams thread;
+                }
+            }
         }
 
-        if (iterators.size() > 0) {
-            return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
+        if (!segments.isEmpty()) {
+            return new RocksDBWindowStoreIterator<>(serdes, Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo), segments.iterator());
         } else {
             return new RocksDBWindowStoreIterator<>(serdes);
         }
     }
 
     private Segment getSegment(long segmentId) {
-        if (segmentId <= currentSegmentId && segmentId > currentSegmentId - segments.length) {
-            int index = (int) (segmentId % segments.length);
+        final Segment segment = segments.get(segmentId % numSegments);
+        if (segment != null && segment.id != segmentId) {
+            return null;
+        }
+        return segment;
+    }
+
 
-            if (segments[index] != null && segments[index].id != segmentId) {
+    private Segment getOrCreateSegment(long segmentId) {
+        if (segmentId <= currentSegmentId && segmentId > currentSegmentId - numSegments) {
+            final long key = segmentId % numSegments;
+            final Segment segment = segments.get(key);
+            if (segment != null && segment.id != segmentId) {
                 cleanup();
             }
-
-            if (segments[index] == null) {
-                segments[index] = new Segment(segmentName(segmentId), name, segmentId);
-                segments[index].openDB(context);
+            if (!segments.containsKey(key)) {
+                final Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
+                newSegment.openDB(context);
+                segments.put(key, newSegment);
             }
-
-            return segments[index];
+            return segments.get(key);
 
         } else {
             return null;
@@ -362,11 +400,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private void cleanup() {
-        for (int i = 0; i < segments.length; i++) {
-            if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
-                segments[i].close();
-                segments[i].destroy();
-                segments[i] = null;
+        for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
+            final Segment segment = segmentEntry.getValue();
+            if (segment != null && segment.id <= currentSegmentId - numSegments) {
+                segments.remove(segmentEntry.getKey());
+                segment.close();
+                segment.destroy();
             }
         }
     }
@@ -393,7 +432,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     public Set<Long> segmentIds() {
         HashSet<Long> segmentIds = new HashSet<>();
 
-        for (Segment segment : segments) {
+        for (Segment segment : segments.values()) {
             if (segment != null)
                 segmentIds.add(segment.id);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
new file mode 100644
index 0000000..2bc717d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+
+import java.util.List;
+
+/**
+ * Provides access to {@link StateStore}s that have been created
+ * as part of the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
+ * To get access to custom stores developers should implement {@link QueryableStoreType}.
+ * @see QueryableStoreTypes
+ */
+public interface StateStoreProvider {
+
+    /**
+     * Find instances of StateStore that are accepted by {@link QueryableStoreType#accepts} and
+     * have the provided storeName.
+     *
+     * @param storeName             name of the store
+     * @param queryableStoreType    filter stores based on this queryableStoreType
+     * @param <T>                   The type of the Store
+     * @return  List of the instances of the store in this topology. Empty List if not found
+     */
+    <T> List<T> getStores(String storeName, QueryableStoreType<T> queryableStoreType);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
new file mode 100644
index 0000000..7363db2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Wrapper over StreamThread that implements StateStoreProvider
+ */
+public class StreamThreadStateStoreProvider implements StateStoreProvider {
+
+    private final StreamThread streamThread;
+
+    public StreamThreadStateStoreProvider(final StreamThread streamThread) {
+        this.streamThread = streamThread;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> List<T> getStores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+        final List<T> stores = new ArrayList<>();
+        for (StreamTask streamTask : streamThread.tasks().values()) {
+            final StateStore store = streamTask.getStore(storeName);
+            if (store != null && queryableStoreType.accepts(store)) {
+                if (!store.isOpen()) {
+                    throw new InvalidStateStoreException("Store: " + storeName + " isn't isOpen");
+                }
+                stores.add((T) store);
+            }
+        }
+        return stores;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 309c9c0..41cd3f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -22,7 +22,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
@@ -37,8 +36,6 @@ public class WindowStoreUtils {
     public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
     public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
 
-    @SuppressWarnings("unchecked")
-    public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0];
 
     public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1dd0d27/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
new file mode 100644
index 0000000..387a07f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides a wrapper over multiple underlying {@link StateStoreProvider}s
+ */
+public class WrappingStoreProvider implements StateStoreProvider {
+
+    private final List<StateStoreProvider> storeProviders;
+
+    public WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
+        this.storeProviders = storeProviders;
+    }
+
+    /**
+     * Provides access to {@link org.apache.kafka.streams.processor.StateStore}s accepted
+     * by {@link QueryableStoreType#accepts(StateStore)}
+     * @param storeName  name of the store
+     * @param type      The {@link QueryableStoreType}
+     * @param <T>       The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
+     * @return  a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)}
+     */
+    public <T> List<T> getStores(final String storeName, QueryableStoreType<T> type) {
+        final List<T> allStores = new ArrayList<>();
+        for (StateStoreProvider provider : storeProviders) {
+            final List<T> stores =
+                provider.getStores(storeName, type);
+            allStores.addAll(stores);
+        }
+        if (allStores.isEmpty()) {
+            throw new InvalidStateStoreException("Store " + storeName + " is currently "
+                                                 + "unavailable");
+        }
+        return allStores;
+    }
+}


Mime
View raw message