kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3522: Interactive Queries must return timestamped stores (#6661)
Date Tue, 07 May 2019 17:49:49 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6d5efa  KAFKA-3522: Interactive Queries must return timestamped stores (#6661)
a6d5efa is described below

commit a6d5efaf0d06f8a66350a8f1b959baf176fd482a
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Tue May 7 19:49:31 2019 +0200

    KAFKA-3522: Interactive Queries must return timestamped stores (#6661)
    
    Reviewers: John Roesler <john@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
---
 .../kafka/streams/state/QueryableStoreTypes.java   |  30 +++--
 .../state/internals/GlobalStateStoreProvider.java  |   8 ++
 .../internals/StreamThreadStateStoreProvider.java  |  12 +-
 .../internals/GlobalStateStoreProviderTest.java    | 135 ++++++++++++++++++++-
 .../StreamThreadStateStoreProviderTest.java        | 122 +++++++++++++++++--
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   |   5 +-
 6 files changed, 285 insertions(+), 27 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 7b1e8b3..d4e9e89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -24,6 +24,11 @@ import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
 import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * Provides access to the {@link QueryableStoreType}s provided with {@link KafkaStreams}.
  * These can be used with {@link KafkaStreams#store(String, QueryableStoreType)}.
@@ -88,23 +93,28 @@ public final class QueryableStoreTypes {
 
     private static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T>
{
 
-        private final Class matchTo;
+        private final Set<Class> matchTo;
 
-        QueryableStoreTypeMatcher(final Class matchTo) {
+        QueryableStoreTypeMatcher(final Set<Class> matchTo) {
             this.matchTo = matchTo;
         }
 
         @SuppressWarnings("unchecked")
         @Override
         public boolean accepts(final StateStore stateStore) {
-            return matchTo.isAssignableFrom(stateStore.getClass());
+            for (final Class matchToClass : matchTo) {
+                if (!matchToClass.isAssignableFrom(stateStore.getClass())) {
+                    return false;
+                }
+            }
+            return true;
         }
     }
 
     public static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K,
V>> {
 
         KeyValueStoreType() {
-            super(ReadOnlyKeyValueStore.class);
+            super(Collections.singleton(ReadOnlyKeyValueStore.class));
         }
 
         @Override
@@ -119,7 +129,9 @@ public final class QueryableStoreTypes {
         extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>>
{
 
         TimestampedKeyValueStoreType() {
-            super(ReadOnlyKeyValueStore.class);
+            super(new HashSet<>(Arrays.asList(
+                TimestampedKeyValueStore.class,
+                ReadOnlyKeyValueStore.class)));
         }
 
         @Override
@@ -132,7 +144,7 @@ public final class QueryableStoreTypes {
     public static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,
V>> {
 
         WindowStoreType() {
-            super(ReadOnlyWindowStore.class);
+            super(Collections.singleton(ReadOnlyWindowStore.class));
         }
 
         @Override
@@ -146,7 +158,9 @@ public final class QueryableStoreTypes {
         extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>>
{
 
         TimestampedWindowStoreType() {
-            super(ReadOnlyWindowStore.class);
+            super(new HashSet<>(Arrays.asList(
+                TimestampedWindowStore.class,
+                ReadOnlyWindowStore.class)));
         }
 
         @Override
@@ -159,7 +173,7 @@ public final class QueryableStoreTypes {
     public static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K,
V>> {
 
         SessionStoreType() {
-            super(ReadOnlySessionStore.class);
+            super(Collections.singleton(ReadOnlySessionStore.class));
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 0db69d0..057a836 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 
 import java.util.Collections;
 import java.util.List;
@@ -41,6 +44,11 @@ public class GlobalStateStoreProvider implements StateStoreProvider {
         if (!store.isOpen()) {
             throw new InvalidStateStoreException("the state store, " + storeName + ", is
not open.");
         }
+        if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof
QueryableStoreTypes.KeyValueStoreType) {
+            return (List<T>) Collections.singletonList(new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object,
Object>) store));
+        } else if (store instanceof TimestampedWindowStore && queryableStoreType
instanceof QueryableStoreTypes.WindowStoreType) {
+            return (List<T>) Collections.singletonList(new ReadOnlyWindowStoreFacade((TimestampedWindowStore<Object,
Object>) store));
+        }
         return (List<T>) Collections.singletonList(store);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index e097963..53c5cc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,7 +40,6 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider
{
         this.streamThread = streamThread;
     }
 
-
     @SuppressWarnings("unchecked")
     @Override
     public <T> List<T> stores(final String storeName, final QueryableStoreType<T>
queryableStoreType) {
@@ -56,7 +58,13 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider
{
                     throw new InvalidStateStoreException("Cannot get state store " + storeName
+ " for task " + streamTask +
                             " because the store is not open. The state store may have migrated
to another instances.");
                 }
-                stores.add((T) store);
+                if (store instanceof TimestampedKeyValueStore && queryableStoreType
instanceof QueryableStoreTypes.KeyValueStoreType) {
+                    stores.add((T) new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object,
Object>) store));
+                } else if (store instanceof TimestampedWindowStore && queryableStoreType
instanceof QueryableStoreTypes.WindowStoreType) {
+                    stores.add((T) new ReadOnlyWindowStoreFacade((TimestampedWindowStore<Object,
Object>) store));
+                } else {
+                    stores.add((T) store);
+                }
             }
         }
         return stores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 8b77b37..9c76e14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -16,35 +16,103 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
 public class GlobalStateStoreProviderTest {
+    private final Map<String, StateStore> stores = new HashMap<>();
+
+    @Before
+    public void before() {
+        stores.put(
+            "kv-store",
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("kv-store"),
+                Serdes.String(),
+                Serdes.String()).build());
+        stores.put(
+            "ts-kv-store",
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("ts-kv-store"),
+                Serdes.String(),
+                Serdes.String()).build());
+        stores.put(
+            "w-store",
+            Stores.windowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    "w-store",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
+                Serdes.String(),
+                Serdes.String()).build());
+        stores.put(
+            "ts-w-store",
+            Stores.timestampedWindowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    "ts-w-store",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
+                Serdes.String(),
+                Serdes.String()).build());
+
+        final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class);
+        expect(mockContext.applicationId()).andReturn("appId").anyTimes();
+        expect(mockContext.metrics()).andReturn(new StreamsMetricsImpl(new Metrics(), "threadName")).anyTimes();
+        expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
+        expect(mockContext.recordCollector()).andReturn(null).anyTimes();
+        replay(mockContext);
+        for (final StateStore store : stores.values()) {
+            store.init(mockContext, null);
+        }
+    }
 
     @Test
     public void shouldReturnSingleItemListIfStoreExists() {
         final GlobalStateStoreProvider provider =
-                new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global",
new NoOpReadOnlyStore<>()));
-        final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global",
QueryableStoreTypes.keyValueStore());
+            new GlobalStateStoreProvider(Collections.singletonMap("global", new NoOpReadOnlyStore<>()));
+        final List<ReadOnlyKeyValueStore<Object, Object>> stores =
+            provider.stores("global", QueryableStoreTypes.keyValueStore());
         assertEquals(stores.size(), 1);
     }
 
     @Test
     public void shouldReturnEmptyItemListIfStoreDoesntExist() {
-        final GlobalStateStoreProvider provider =
-                new GlobalStateStoreProvider(Collections.<String, StateStore>emptyMap());
-        final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global",
QueryableStoreTypes.keyValueStore());
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(Collections.emptyMap());
+        final List<ReadOnlyKeyValueStore<Object, Object>> stores =
+            provider.stores("global", QueryableStoreTypes.keyValueStore());
         assertTrue(stores.isEmpty());
     }
 
@@ -53,8 +121,63 @@ public class GlobalStateStoreProviderTest {
         final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<>();
         store.close();
         final GlobalStateStoreProvider provider =
-                new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global",
store));
+            new GlobalStateStoreProvider(Collections.singletonMap("global", store));
         provider.stores("global", QueryableStoreTypes.keyValueStore());
     }
 
+    @Test
+    public void shouldReturnKeyValueStore() {
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, String>> stores =
+            provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+        }
+    }
+
+    @Test
+    public void shouldReturnTimestampedKeyValueStore() {
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+            provider.stores("ts-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store
: stores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+        }
+    }
+
+    @Test
+    public void shouldNotReturnKeyValueStoreAsTimestampedStore() {
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+            provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+        assertEquals(0, stores.size());
+    }
+
+    @Test
+    public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() {
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+            provider.stores("ts-kv-store", QueryableStoreTypes.keyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store
: stores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+        }
+    }
+
+    @Test
+    public void shouldReturnTimestampedWindowStoreAsWindowStore() {
+        final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
+            provider.stores("ts-w-store", QueryableStoreTypes.windowStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store
: stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+        }
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a10b62d..da2d46d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -38,6 +38,9 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -57,7 +60,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.kafka.streams.state.QueryableStoreTypes.timestampedWindowStore;
 import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 
 public class StreamThreadStateStoreProviderTest {
@@ -82,16 +89,31 @@ public class StreamThreadStateStoreProviderTest {
                 Serdes.String()),
             "the-processor");
         topology.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("timestamped-kv-store"),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
+        topology.addStateStore(
             Stores.windowStoreBuilder(
-                Stores.persistentWindowStore(
+                Stores.inMemoryWindowStore(
                     "window-store",
                     Duration.ofMillis(10L),
                     Duration.ofMillis(2L),
                     false),
                 Serdes.String(),
                 Serdes.String()),
-            "the-processor"
-        );
+            "the-processor");
+        topology.addStateStore(
+            Stores.timestampedWindowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    "timestamped-window-store",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -142,14 +164,100 @@ public class StreamThreadStateStoreProviderTest {
         final List<ReadOnlyKeyValueStore<String, String>> kvStores =
             provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
         assertEquals(2, kvStores.size());
+        for (final ReadOnlyKeyValueStore<String, String> store: kvStores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+        }
+    }
+
+    @Test
+    public void shouldFindTimestampedKeyValueStores() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
tkvStores =
+            provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+        assertEquals(2, tkvStores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store:
tkvStores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+        }
+    }
+
+    @Test
+    public void shouldNotFindKeyValueStoresAsTimestampedStore() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
tkvStores =
+            provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+        assertEquals(0, tkvStores.size());
+    }
+
+    @Test
+    public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
tkvStores =
+            provider.stores("timestamped-kv-store", QueryableStoreTypes.keyValueStore());
+        assertEquals(2, tkvStores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store:
tkvStores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+        }
     }
 
     @Test
     public void shouldFindWindowStores() {
         mockThread(true);
-        final List<ReadOnlyWindowStore<Object, Object>> windowStores =
+        final List<ReadOnlyWindowStore<String, String>> windowStores =
             provider.stores("window-store", windowStore());
         assertEquals(2, windowStores.size());
+        for (final ReadOnlyWindowStore<String, String> store: windowStores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+        }
+    }
+
+    @Test
+    public void shouldFindTimestampedWindowStores() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
windowStores =
+            provider.stores("timestamped-window-store", timestampedWindowStore());
+        assertEquals(2, windowStores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store:
windowStores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, instanceOf(TimestampedWindowStore.class));
+        }
+    }
+
+    @Test
+    public void shouldNotFindWindowStoresAsTimestampedStore() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
windowStores =
+            provider.stores("window-store", timestampedWindowStore());
+        assertEquals(0, windowStores.size());
+    }
+
+    @Test
+    public void shouldFindTimestampedWindowStoresAsWindowStore() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
windowStores =
+            provider.stores("timestamped-window-store", windowStore());
+        assertEquals(2, windowStores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store:
windowStores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+        }
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
+        mockThread(true);
+        taskOne.getStore("kv-store").close();
+        provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
+        mockThread(true);
+        taskOne.getStore("timestamped-kv-store").close();
+        provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
     }
 
     @Test(expected = InvalidStateStoreException.class)
@@ -160,10 +268,10 @@ public class StreamThreadStateStoreProviderTest {
     }
 
     @Test(expected = InvalidStateStoreException.class)
-    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
+    public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
         mockThread(true);
-        taskOne.getStore("kv-store").close();
-        provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+        taskOne.getStore("timestamped-window-store").close();
+        provider.stores("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 08945d5..dbdd0b4 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -23,16 +23,13 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.io.File;
 
-public class NoOpReadOnlyStore<K, V>
-        implements ReadOnlyKeyValueStore<K, V>, StateStore {
-
+public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>,
StateStore {
     private final String name;
     private final boolean rocksdbStore;
     private boolean open = true;
     public boolean initialized;
     public boolean flushed;
 
-
     public NoOpReadOnlyStore() {
         this("", false);
     }


Mime
View raw message