kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5668; fetch across stores in CompositeReadOnlyWindowStore & CompositeReadOnlySessionStore
Date Fri, 18 Aug 2017 16:59:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3457c4761 -> 75c78e969


KAFKA-5668; fetch across stores in CompositeReadOnlyWindowStore & CompositeReadOnlySessionStore

Fix range queries in `CompositeReadOnlyWindowStore` and `CompositeReadOnlySessionStore` to
fetch across all stores (was previously just looking in the first store)

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3685 from dguy/kafka-5668


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75c78e96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75c78e96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75c78e96

Branch: refs/heads/trunk
Commit: 75c78e9692a479e2c94dbd08e564f64faedd2071
Parents: 3457c47
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Aug 18 17:59:33 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Aug 18 17:59:33 2017 +0100

----------------------------------------------------------------------
 .../internals/CompositeKeyValueIterator.java    | 74 ++++++++++++++++++++
 .../CompositeReadOnlyKeyValueStore.java         | 66 ++---------------
 .../CompositeReadOnlySessionStore.java          | 37 +++++-----
 .../internals/CompositeReadOnlyWindowStore.java | 53 +++++---------
 .../state/internals/NextIteratorFunction.java   | 24 +++++++
 .../CompositeReadOnlySessionStoreTest.java      | 29 ++++++++
 .../CompositeReadOnlyWindowStoreTest.java       | 29 ++++++++
 .../internals/RocksDBSessionStoreTest.java      |  2 +-
 .../kafka/test/ReadOnlySessionStoreStub.java    |  4 +-
 9 files changed, 197 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
new file mode 100644
index 0000000..faccc16
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class CompositeKeyValueIterator<K, V, StoreType> implements KeyValueIterator<K,
V> {
+
+    private final Iterator<StoreType> storeIterator;
+    private final NextIteratorFunction<K, V, StoreType> nextIteratorFunction;
+
+    private KeyValueIterator<K, V> current;
+
+    CompositeKeyValueIterator(final Iterator<StoreType> underlying,
+                              final NextIteratorFunction<K, V, StoreType> nextIteratorFunction)
{
+        this.storeIterator = underlying;
+        this.nextIteratorFunction = nextIteratorFunction;
+    }
+
+    @Override
+    public void close() {
+        if (current != null) {
+            current.close();
+            current = null;
+        }
+    }
+
+    @Override
+    public K peekNextKey() {
+        throw new UnsupportedOperationException("peekNextKey not supported");
+    }
+
+    @Override
+    public boolean hasNext() {
+        while ((current == null || !current.hasNext())
+                && storeIterator.hasNext()) {
+            close();
+            current = nextIteratorFunction.apply(storeIterator.next());
+        }
+        return current != null && current.hasNext();
+    }
+
+
+    @Override
+    public KeyValue<K, V> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return current.next();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("Remove not supported");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index 3022645..e3354e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -16,15 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
-import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 
 /**
@@ -71,7 +68,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
     public KeyValueIterator<K, V> range(final K from, final K to) {
         Objects.requireNonNull(from);
         Objects.requireNonNull(to);
-        final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K,
V>() {
+        final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction
= new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
             @Override
             public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V>
store) {
                 try {
@@ -82,12 +79,12 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName,
storeType);
-        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(),
nextIteratorFunction));
+        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(),
nextIteratorFunction));
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K,
V>() {
+        final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction
= new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
             @Override
             public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V>
store) {
                 try {
@@ -98,7 +95,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
             }
         };
         final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName,
storeType);
-        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator(stores.iterator(),
nextIteratorFunction));
+        return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(),
nextIteratorFunction));
     }
 
     @Override
@@ -111,61 +108,6 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto
         return total < 0 ? Long.MAX_VALUE : total;
     }
 
-    interface NextIteratorFunction<K, V> {
 
-        KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store);
-    }
-
-
-    private class CompositeKeyValueIterator implements KeyValueIterator<K, V> {
-
-        private final Iterator<ReadOnlyKeyValueStore<K, V>> storeIterator;
-        private final NextIteratorFunction<K, V> nextIteratorFunction;
-
-        private KeyValueIterator<K, V> current;
-
-        CompositeKeyValueIterator(final Iterator<ReadOnlyKeyValueStore<K, V>>
underlying,
-                                  final NextIteratorFunction<K, V> nextIteratorFunction)
{
-            this.storeIterator = underlying;
-            this.nextIteratorFunction = nextIteratorFunction;
-        }
-
-        @Override
-        public void close() {
-            if (current != null) {
-                current.close();
-                current = null;
-            }
-        }
-
-        @Override
-        public K peekNextKey() {
-            throw new UnsupportedOperationException("peekNextKey not supported");
-        }
-
-        @Override
-        public boolean hasNext() {
-            while ((current == null || !current.hasNext())
-                    && storeIterator.hasNext()) {
-                close();
-                current = nextIteratorFunction.apply(storeIterator.next());
-            }
-            return current != null && current.hasNext();
-        }
-
-
-        @Override
-        public KeyValue<K, V> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return current.next();
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Remove not supported");
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
index d63ab4b..e4da424 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link
@@ -41,15 +42,14 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
         this.storeName = storeName;
     }
 
-    private interface Fetcher<K, V> {
-        KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K, V>
store);
-    }
 
-    private KeyValueIterator<Windowed<K>, V> fetch(Fetcher<K, V> fetcher)
{
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+        Objects.requireNonNull(key, "key can't be null");
         final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName,
queryableStoreType);
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
-                final KeyValueIterator<Windowed<K>, V> result = fetcher.fetch(store);
+                final KeyValueIterator<Windowed<K>, V> result = store.fetch(key);
                 if (!result.hasNext()) {
                     result.close();
                 } else {
@@ -57,31 +57,26 @@ public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore
                 }
             } catch (final InvalidStateStoreException ise) {
                 throw new InvalidStateStoreException("State store  [" + storeName + "] is
not available anymore" +
-                                                     " and may have been migrated to another
instance; " +
-                                                     "please re-discover its location from
the state metadata.");
+                                                             " and may have been migrated
to another instance; " +
+                                                             "please re-discover its location
from the state metadata.");
             }
         }
         return KeyValueIterators.emptyIterator();
     }
 
-
-    @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
-        return fetch(new Fetcher<K, V>() {
-            @Override
-            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K,
V> store) {
-                return store.fetch(key);
-            }
-        });
-    }
-
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
-        return fetch(new Fetcher<K, V>() {
+        Objects.requireNonNull(from, "from can't be null");
+        Objects.requireNonNull(to, "to can't be null");
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>>
nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K,
V>>() {
             @Override
-            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlySessionStore<K,
V> store) {
+            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlySessionStore<K,
V> store) {
                 return store.fetch(from, to);
             }
-        });
+        };
+        return new DelegatingPeekingKeyValueIterator<>(storeName,
+                                                       new CompositeKeyValueIterator<>(
+                                                               storeProvider.stores(storeName,
queryableStoreType).iterator(),
+                                                               nextIteratorFunction));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index fbfb5aa..298573c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
@@ -43,16 +44,13 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
         this.storeName = storeName;
     }
 
-    private interface Fetcher<K, V, IteratorType extends KeyValueIterator<?, V>>
{
-        IteratorType fetch(ReadOnlyWindowStore<K, V> store);
-        IteratorType empty();
-    }
-
-    public <IteratorType extends KeyValueIterator<?, V>> IteratorType fetch(Fetcher<K,
V, IteratorType> fetcher) {
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long
timeTo) {
+        Objects.requireNonNull(key, "key can't be null");
         final List<ReadOnlyWindowStore<K, V>> stores = provider.stores(storeName,
windowStoreType);
         for (ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
-                final IteratorType result = fetcher.fetch(windowStore);
+                final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom,
timeTo);
                 if (!result.hasNext()) {
                     result.close();
                 } else {
@@ -60,41 +58,26 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
                 }
             } catch (InvalidStateStoreException e) {
                 throw new InvalidStateStoreException(
-                    "State store is not available anymore and may have been migrated to another
instance; " +
-                    "please re-discover its location from the state metadata.");
+                        "State store is not available anymore and may have been migrated
to another instance; " +
+                                "please re-discover its location from the state metadata.");
             }
         }
-
-        return fetcher.empty();
-    }
-
-    @Override
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long
timeTo) {
-        return fetch(new Fetcher<K, V, WindowStoreIterator<V>>() {
-            @Override
-            public WindowStoreIterator<V> fetch(ReadOnlyWindowStore<K, V> store)
{
-                return store.fetch(key, timeFrom, timeTo);
-            }
-
-            @Override
-            public WindowStoreIterator<V> empty() {
-                return KeyValueIterators.emptyWindowStoreIterator();
-            }
-        });
+        return KeyValueIterators.emptyWindowStoreIterator();
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final
long timeFrom, final long timeTo) {
-        return fetch(new Fetcher<K, V, KeyValueIterator<Windowed<K>, V>>()
{
-            @Override
-            public KeyValueIterator<Windowed<K>, V> fetch(ReadOnlyWindowStore<K,
V> store) {
-                return store.fetch(from, to, timeFrom, timeTo);
-            }
-
+        Objects.requireNonNull(from, "from can't be null");
+        Objects.requireNonNull(to, "to can't be null");
+        final NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K, V>>
nextIteratorFunction = new NextIteratorFunction<Windowed<K>, V, ReadOnlyWindowStore<K,
V>>() {
             @Override
-            public KeyValueIterator<Windowed<K>, V> empty() {
-                return KeyValueIterators.emptyIterator();
+            public KeyValueIterator<Windowed<K>, V> apply(final ReadOnlyWindowStore<K,
V> store) {
+                return store.fetch(from, to, timeFrom, timeFrom);
             }
-        });
+        };
+        return new DelegatingPeekingKeyValueIterator<>(storeName,
+                                                       new CompositeKeyValueIterator<>(
+                                                               provider.stores(storeName,
windowStoreType).iterator(),
+                                                               nextIteratorFunction));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/main/java/org/apache/kafka/streams/state/internals/NextIteratorFunction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NextIteratorFunction.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NextIteratorFunction.java
new file mode 100644
index 0000000..9f9ac55
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NextIteratorFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+interface NextIteratorFunction<K, V, StoreType> {
+
+    KeyValueIterator<K, V> apply(final StoreType store);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index f077321..0e2d1b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.test.ReadOnlySessionStoreStub;
 import org.apache.kafka.test.StateStoreProviderStub;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -32,6 +33,8 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -125,4 +128,30 @@ public class CompositeReadOnlySessionStoreTest {
     public void shouldThrowNullPointerExceptionIfFetchingNullKey() {
         sessionStore.fetch(null);
     }
+
+    @Test
+    public void shouldFetchKeyRangeAcrossStores() {
+        final ReadOnlySessionStoreStub<String, Long> secondUnderlying = new
+                ReadOnlySessionStoreStub<>();
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L);
+        secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L);
+        final List<KeyValue<Windowed<String>, Long>> results = StreamsTestUtils.toList(sessionStore.fetch("a",
"b"));
+        assertThat(results.size(), equalTo(2));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfKeyIsNull() {
+        underlyingSessionStore.fetch(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfFromKeyIsNull() {
+        underlyingSessionStore.fetch(null, "a");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfToKeyIsNull() {
+        underlyingSessionStore.fetch("a", null);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index b6e95a7..1b91a9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.StateStoreProviderStub;
@@ -34,6 +35,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 
 public class CompositeReadOnlyWindowStoreTest {
@@ -163,4 +166,30 @@ public class CompositeReadOnlyWindowStoreTest {
         windowStoreIterator.next();
     }
 
+    @Test
+    public void shouldFetchKeyRangeAcrossStores() {
+        final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
+                ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stubProviderTwo.addStore(storeName, secondUnderlying);
+        underlyingWindowStore.put("a", "a", 0L);
+        secondUnderlying.put("b", "b", 0L);
+        List<KeyValue<Windowed<String>, String>> results = StreamsTestUtils.toList(windowStore.fetch("a",
"b", 0, 1));
+        assertThat(results.size(), equalTo(2));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfKeyIsNull() {
+        windowStore.fetch(null, 0, 0);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfFromKeyIsNull() {
+        windowStore.fetch(null, "a", 0, 0);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNPEIfToKeyIsNull() {
+        windowStore.fetch("a", null, 0, 0);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index bd10db9..090aaeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -233,7 +233,7 @@ public class RocksDBSessionStoreTest {
     public void shouldThrowNullPointerExceptionOnPutNullKey() throws Exception {
         sessionStore.put(null, 1L);
     }
-
+    
     static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>,
Long> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75c78e96/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index ed408e6..0d196fe 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -58,10 +58,10 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K,
V
         if (!open) {
             throw new InvalidStateStoreException("not open");
         }
-        if (!sessions.subMap(from, to).isEmpty()) {
+        if (sessions.subMap(from, true, to, true).isEmpty()) {
             return new KeyValueIteratorStub<>(Collections.<KeyValue<Windowed<K>,
V>>emptyIterator());
         }
-        final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator
= sessions.subMap(from, to).values().iterator();
+        final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator
= sessions.subMap(from, true,  to, true).values().iterator();
         return new KeyValueIteratorStub<>(
             new Iterator<KeyValue<Windowed<K>, V>>() {
 


Mime
View raw message