kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4123: Queryable State returning null for key before all stores in instance have been initialized
Date Thu, 08 Sep 2016 00:42:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ca3f3a492 -> eba0ede87


KAFKA-4123: Queryable State returning null for key before all stores in instance have been
initialized

Mark the store as open after the DB has been restored from the changelog.
Only add the store to the map in ProcessorStateManager post restore.
Make RocksDBWindowStore.Segment override openDB(..) as it needs to mark the Segment as open.
Throw InvalidStateStoreException if any stores in a KafkaStreams instance are not available.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1824 from dguy/kafka-4123


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eba0ede8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eba0ede8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eba0ede8

Branch: refs/heads/trunk
Commit: eba0ede878bd9c42431cc702bc58f56354c595a6
Parents: ca3f3a4
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Sep 7 17:42:35 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 7 17:42:35 2016 -0700

----------------------------------------------------------------------
 .../errors/InvalidStateStoreException.java      | 31 +++++++++
 .../internals/ProcessorStateManager.java        |  4 +-
 .../processor/internals/StreamThread.java       |  7 ++
 .../streams/state/ReadOnlyKeyValueStore.java    | 10 ++-
 .../streams/state/ReadOnlyWindowStore.java      |  2 +
 .../internals/InvalidStateStoreException.java   | 27 --------
 .../state/internals/QueryableStoreProvider.java |  3 +-
 .../streams/state/internals/RocksDBStore.java   |  5 +-
 .../state/internals/RocksDBWindowStore.java     |  7 ++
 .../StreamThreadStateStoreProvider.java         |  4 ++
 .../state/internals/WrappingStoreProvider.java  |  1 +
 .../QueryableStateIntegrationTest.java          | 71 +++++++++++++++++++-
 .../CompositeReadOnlyKeyValueStoreTest.java     |  1 +
 .../internals/QueryableStoreProviderTest.java   | 26 +++----
 .../StreamThreadStateStoreProviderTest.java     | 14 ++++
 .../internals/WrappingStoreProviderTest.java    |  1 +
 16 files changed, 166 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
new file mode 100644
index 0000000..37ad580
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.kafka.streams.errors;
+
+/**
+ * Indicates that there was a problem when trying to access
+ * a {@link org.apache.kafka.streams.processor.StateStore}, i.e, the Store is no longer valid
because it is closed
+ * or doesn't exist any more due to a rebalance.
+ *
+ * These exceptions may be transient, i.e., during a rebalance it won't be possible to query
the stores as they are
+ * being (re)-initialized. Once the rebalance has completed the stores will be available
again. Hence, it is valid
+ * to backoff and retry when handling this exception.
+ */
+public class InvalidStateStoreException extends StreamsException {
+
+    public InvalidStateStoreException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 8aeeb62..123e475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -166,14 +166,14 @@ public class ProcessorStateManager {
         if (partitionNotFound)
             throw new StreamsException(String.format("task [%s]  Store %s's change log (%s)
does not contain partition %s", taskId, store.name(), topic, partition));
 
-        this.stores.put(store.name(), store);
-
         if (isStandby) {
             if (store.persistent())
                 restoreCallbacks.put(topic, stateRestoreCallback);
         } else {
             restoreActiveState(topic, stateRestoreCallback);
         }
+
+        this.stores.put(store.name(), store);
     }
 
     private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d8f6003..f753b0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -105,6 +105,7 @@ public class StreamThread extends Thread {
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
+    private AtomicBoolean initialized = new AtomicBoolean(false);
 
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
@@ -114,6 +115,7 @@ public class StreamThread extends Thread {
                 addStandbyTasks();
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
                 streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(),
partitionAssignor.clusterMetadata());
+                initialized.set(true);
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -123,6 +125,7 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
+                initialized.set(false);
                 commitAll();
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions
are assigned
             } catch (Throwable t) {
@@ -138,6 +141,10 @@ public class StreamThread extends Thread {
         }
     };
 
+    public boolean isInitialized() {
+        return initialized.get();
+    }
+
     public StreamThread(TopologyBuilder builder,
                         StreamsConfig config,
                         KafkaClientSupplier clientSupplier,

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
index 0799fae..880c676 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -15,11 +15,13 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 
 /**
  * A key value store that only supports read operations.
  * Implementations should be thread-safe as concurrent reads and writes
- * are expected
+ * are expected.
+ *
  * @param <K> the key type
  * @param <V> the value type
  */
@@ -27,11 +29,12 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 public interface ReadOnlyKeyValueStore<K, V> {
 
     /**
-     * Get the value corresponding to this key
+     * Get the value corresponding to this key.
      *
      * @param key The key to fetch
      * @return The value or null if no value is found.
      * @throws NullPointerException If null is used for key.
+     * @throws InvalidStateStoreException if the store is not initialized
      */
     V get(K key);
 
@@ -43,6 +46,7 @@ public interface ReadOnlyKeyValueStore<K, V> {
      * @param to The last key that could be in the range
      * @return The iterator for this range.
      * @throws NullPointerException If null is used for from or to.
+     * @throws InvalidStateStoreException if the store is not initialized
      */
     KeyValueIterator<K, V> range(K from, K to);
 
@@ -51,6 +55,7 @@ public interface ReadOnlyKeyValueStore<K, V> {
      * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
      * and must not return null values. No ordering guarantees are provided.
      * @return An iterator of all key/value pairs in the store.
+     * @throws InvalidStateStoreException if the store is not initialized
      */
     KeyValueIterator<K, V> all();
 
@@ -61,6 +66,7 @@ public interface ReadOnlyKeyValueStore<K, V> {
      * where an exact count is expensive to calculate.
      *
      * @return an approximate count of key-value mappings in the store.
+     * @throws InvalidStateStoreException if the store is not initialized
      */
     long approximateNumEntries();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 347a355..80da9e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 
 /**
  * A window store that only supports read operations
@@ -31,6 +32,7 @@ public interface ReadOnlyWindowStore<K, V> {
      * the existing windows.
      *
      * @return an iterator over key-value pairs {@code <timestamp, value>}
+     * @throws InvalidStateStoreException if the store is not initialized
      */
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
deleted file mode 100644
index b88704f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InvalidStateStoreException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may
obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-/**
- * Indicates that there was a problem when trying to access
- * a {@link org.apache.kafka.streams.processor.StateStore}, i.e, the Store is no longer valid
because it is closed
- * or doesn't exist any more due to a rebalance.
- */
-public class InvalidStateStoreException extends RuntimeException {
-
-    public InvalidStateStoreException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 895fb01..640761c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -14,6 +14,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
@@ -45,7 +46,7 @@ public class QueryableStoreProvider {
             allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
         }
         if (allStores.isEmpty()) {
-            return null;
+            throw new InvalidStateStoreException("Store: " + storeName + " is currently not
available");
         }
         return queryableStoreType.create(
                 new WrappingStoreProvider(storeProviders),

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 4d41b42..6bd0f92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -101,7 +102,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
-    private volatile boolean open = false;
+    protected volatile boolean open = false;
 
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
@@ -176,7 +177,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
-        open = true;
     }
 
     public void init(ProcessorContext context, StateStore root) {
@@ -219,6 +219,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
                 putInternal(key, value);
             }
         });
+        open = true;
     }
 
     private RocksDB openDB(File dir, Options options, int ttl) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 528b905..db1d2ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -66,6 +67,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K,
V> {
         public void destroy() {
             Utils.delete(dbDir);
         }
+
+        @Override
+        public void openDB(final ProcessorContext context) {
+            super.openDB(context);
+            open = true;
+        }
     }
 
     private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V>
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 49b0e6d..e761ed0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -14,6 +14,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -37,6 +38,9 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider
{
     @SuppressWarnings("unchecked")
     @Override
     public <T> List<T> stores(final String storeName, final QueryableStoreType<T>
queryableStoreType) {
+        if (!streamThread.isInitialized()) {
+            throw new InvalidStateStoreException("Store: " + storeName + " is currently not
available as the stream thread has not (re-)initialized yet");
+        }
         final List<T> stores = new ArrayList<>();
         for (StreamTask streamTask : streamThread.tasks().values()) {
             final StateStore store = streamTask.getStore(storeName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index bbdf040..1672112 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -14,6 +14,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 310b584..496644f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -64,6 +65,7 @@ import java.util.TreeSet;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -73,10 +75,11 @@ public class QueryableStateIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private final MockTime mockTime = CLUSTER.time;
     private static final String STREAM_ONE = "stream-one";
+    private static final String STREAM_TWO = "stream-two";
     private static final String STREAM_CONCURRENT = "stream-concurrent";
     private static final String OUTPUT_TOPIC = "output";
     private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent";
-    private static final String STREAM_TWO = "stream-two";
+    private static final String STREAM_THREE = "stream-three";
     private static final int NUM_PARTITIONS = NUM_BROKERS;
     private static final int NUM_REPLICAS = NUM_BROKERS;
     private static final long WINDOW_SIZE = 60000L;
@@ -93,6 +96,7 @@ public class QueryableStateIntegrationTest {
         CLUSTER.createTopic(STREAM_ONE);
         CLUSTER.createTopic(STREAM_CONCURRENT);
         CLUSTER.createTopic(STREAM_TWO, NUM_PARTITIONS, NUM_REPLICAS);
+        CLUSTER.createTopic(STREAM_THREE, 4, 1);
         CLUSTER.createTopic(OUTPUT_TOPIC);
         CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT);
         CLUSTER.createTopic(OUTPUT_TOPIC_THREE);
@@ -423,6 +427,71 @@ public class QueryableStateIntegrationTest {
 
     }
 
+    @Test
+    public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> stream = builder.stream(STREAM_THREE);
+
+        final String storeName = "count-by-key";
+        stream.groupByKey().count(storeName);
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+                STREAM_THREE,
+                Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                mockTime);
+
+        final int maxWaitMs = 30000;
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                try {
+                    kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+                    return true;
+                } catch (InvalidStateStoreException ise) {
+                    return false;
+                }
+            }
+        }, maxWaitMs, "waiting for store " + storeName);
+
+        final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return new Long(8).equals(store.get("hello"));
+            }
+        }, maxWaitMs, "wait for count to be 8");
+
+        // close stream
+        kafkaStreams.close();
+
+        // start again
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+
+        // make sure we never get any value other than 8 for hello
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                try {
+                    assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore()).get("hello"));
+                    return true;
+                } catch (InvalidStateStoreException ise) {
+                    return false;
+                }
+            }
+        }, maxWaitMs, "waiting for store " + storeName);
+
+    }
+
     private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
                                    final ReadOnlyKeyValueStore<String, Long> myCount)
{
         final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 14af192..cdc5ac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index db4a913..276930f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -15,6 +15,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.junit.Before;
@@ -23,7 +24,6 @@ import org.junit.Test;
 import java.util.Collections;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 public class QueryableStoreProviderTest {
 
@@ -41,14 +41,14 @@ public class QueryableStoreProviderTest {
                 Collections.<StateStoreProvider>singletonList(theStoreProvider));
     }
 
-    @Test
-    public void shouldReturnNullIfKVStoreDoesntExist() throws Exception {
-        assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore()));
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionIfKVStoreDoesntExist() throws Exception {
+        storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore());
     }
 
-    @Test
-    public void shouldReturnNullIfWindowStoreDoesntExist() throws Exception {
-        assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore()));
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionIfWindowStoreDoesntExist() throws Exception {
+        storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore());
     }
 
     @Test
@@ -61,14 +61,14 @@ public class QueryableStoreProviderTest {
         assertNotNull(storeProvider.getStore(windowStore, QueryableStoreTypes.windowStore()));
     }
 
-    @Test
-    public void shouldNotReturnKVStoreWhenIsWindowStore() throws Exception {
-        assertNull(storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore()));
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() throws Exception
{
+        storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore());
     }
 
-    @Test
-    public void shouldNotReturnWindowStoreWhenIsKVStore() throws Exception {
-        assertNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore()));
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() throws Exception
{
+        storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore());
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 795d7da..64ce39e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -62,6 +63,7 @@ public class StreamThreadStateStoreProviderTest {
     private StreamThreadStateStoreProvider provider;
     private StateDirectory stateDirectory;
     private File stateDir;
+    private boolean storesAvailable;
 
     @Before
     public void before() throws IOException {
@@ -105,6 +107,7 @@ public class StreamThreadStateStoreProviderTest {
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
+        storesAvailable = true;
         thread = new StreamThread(builder, streamsConfig, clientSupplier,
                                   applicationId,
                                   "clientId", UUID.randomUUID(), new Metrics(),
@@ -113,6 +116,11 @@ public class StreamThreadStateStoreProviderTest {
             public Map<TaskId, StreamTask> tasks() {
                 return tasks;
             }
+
+            @Override
+            public boolean isInitialized() {
+                return storesAvailable;
+            }
         };
         provider = new StreamThreadStateStoreProvider(thread);
 
@@ -163,6 +171,12 @@ public class StreamThreadStateStoreProviderTest {
                                                               QueryableStoreTypes.keyValueStore()));
     }
 
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() throws Exception
{
+        storesAvailable = false;
+        provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+    }
+
     private StreamTask createStreamsTask(final String applicationId,
                                          final StreamsConfig streamsConfig,
                                          final MockClientSupplier clientSupplier,

http://git-wip-us.apache.org/repos/asf/kafka/blob/eba0ede8/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index 1058eef..710557e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.NoOpWindowStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;


Mime
View raw message