kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4863: Querying window store may return unwanted keys. Backport to 0.10.2
Date Tue, 21 Mar 2017 23:14:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 77bfe78ba -> 3d294533c


KAFKA-4863: Querying window store may return unwanted keys. Backport to 0.10.2

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2688 from dguy/kafka-4863-10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d294533
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d294533
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d294533

Branch: refs/heads/0.10.2
Commit: 3d294533c8613d2cf57ead76dfd75ca472659132
Parents: 77bfe78
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Mar 21 16:14:13 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 21 16:14:13 2017 -0700

----------------------------------------------------------------------
 .../kstream/internals/SessionKeySerde.java      |   9 ++
 .../state/internals/CachingSessionStore.java    |  52 ---------
 .../state/internals/CachingWindowStore.java     |  12 +-
 .../state/internals/FilteredCacheIterator.java  |  73 ++++++++++++
 .../state/internals/SessionKeySchema.java       |  14 +--
 .../state/internals/WindowStoreKeySchema.java   |  25 ++--
 .../kstream/internals/SessionKeySerdeTest.java  |   8 ++
 .../state/internals/CachingWindowStoreTest.java |  22 ++++
 .../internals/RocksDBSessionStoreTest.java      |  39 ++++++-
 .../state/internals/RocksDBWindowStoreTest.java | 116 +++++++++++++++++++
 10 files changed, 298 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index d9a3528..249350e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -163,4 +163,13 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>>
{
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
         return new TimeWindow(start, end);
     }
+
+    public static Windowed<Bytes> fromBytes(Bytes bytesKey) {
+        final byte[] binaryKey = bytesKey.get();
+        final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
+        final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
+        final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
+        return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), new SessionWindow(start,
end));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 2cea915..6cfbf81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -31,7 +30,6 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
-import java.util.NoSuchElementException;
 
 
 class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedStateStore
implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> {
@@ -164,54 +162,4 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         this.flushListener = flushListener;
     }
 
-    private static class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes,
LRUCacheEntry> {
-        private final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator;
-        private final HasNextCondition hasNextCondition;
-
-        FilteredCacheIterator(final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator,
-                              final HasNextCondition hasNextCondition) {
-            this.cacheIterator = cacheIterator;
-            this.hasNextCondition = hasNextCondition;
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-
-        @Override
-        public Bytes peekNextKey() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.peekNextKey();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return hasNextCondition.hasNext(cacheIterator);
-        }
-
-        @Override
-        public KeyValue<Bytes, LRUCacheEntry> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.next();
-
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return cacheIterator.peekNext();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index d471761..33df426 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -39,6 +39,8 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
     private final Serde<V> valueSerde;
     private CacheFlushListener<Windowed<K>, V> flushListener;
     private final long windowSize;
+    private final SegmentedBytesStore.KeySchema keySchema = new WindowStoreKeySchema();
+
     private String name;
     private ThreadCache cache;
     private InternalProcessorContext context;
@@ -146,9 +148,17 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
 
         final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(Bytes.wrap(serdes.rawKey(key)),
timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, binaryFrom,
binaryTo);
-        return new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
+
+
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(Bytes.wrap(serdes.rawKey(key)),
+                                                                             timeFrom,
+                                                                             timeTo);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator =
new FilteredCacheIterator(cacheIterator, hasNextCondition);
+
+        return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                           underlyingIterator,
                                                           new StateSerdes<>(serdes.stateName(),
Serdes.Long(), serdes.valueSerde()));
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
new file mode 100644
index 0000000..1351903
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.NoSuchElementException;
+
+class FilteredCacheIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry>
{
+    private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
+    private final HasNextCondition hasNextCondition;
+
+    FilteredCacheIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
+                          final HasNextCondition hasNextCondition) {
+        this.cacheIterator = cacheIterator;
+        this.hasNextCondition = hasNextCondition;
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+
+    @Override
+    public Bytes peekNextKey() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return hasNextCondition.hasNext(cacheIterator);
+    }
+
+    @Override
+    public KeyValue<Bytes, LRUCacheEntry> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.next();
+
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public KeyValue<Bytes, LRUCacheEntry> peekNext() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        return cacheIterator.peekNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 604abb3..3747d0f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -50,15 +50,15 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {
         return new HasNextCondition() {
             @Override
             public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                if (iterator.hasNext()) {
+                while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
-                    final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get()));
-                    if (!keyBytes.equals(binaryKey)) {
-                        return false;
+                    final Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
+                    if (windowedKey.key().equals(binaryKey)
+                            && windowedKey.window().end() >= from
+                            && windowedKey.window().start() <= to) {
+                        return true;
                     }
-                    final long start = SessionKeySerde.extractStart(bytes.get());
-                    final long end = SessionKeySerde.extractEnd(bytes.get());
-                    return end >= from && start <= to;
+                    iterator.next();
                 }
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
index 093161e..a4d347c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreKeySchema.java
@@ -24,12 +24,6 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.List;
 
 class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
-    private static final HasNextCondition ITERATOR_HAS_NEXT = new HasNextCondition() {
-        @Override
-        public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-            return iterator.hasNext();
-        }
-    };
     private final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("window-store-key-schema",
Serdes.Bytes(), Serdes.ByteArray());
 
     @Override
@@ -49,7 +43,24 @@ class WindowStoreKeySchema implements RocksDBSegmentedBytesStore.KeySchema
{
 
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final
long to) {
-        return ITERATOR_HAS_NEXT;
+        return new HasNextCondition() {
+            @Override
+            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
+                while (iterator.hasNext()) {
+                    final Bytes bytes = iterator.peekNextKey();
+                    final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
+                    final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
+                    if (keyBytes.equals(binaryKey)
+                            && time >= from
+                            && time <= to) {
+                        return true;
+                    }
+                    iterator.next();
+                }
+
+                return false;
+            }
+        };
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index 3a0f490..3ccdc2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -84,4 +84,12 @@ public class SessionKeySerdeTest {
         assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
     }
 
+    @Test
+    public void shouldExtractBytesKeyFromBinary() throws Exception {
+        final Bytes bytesKey = Bytes.wrap("key".getBytes());
+        final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, new
SessionWindow(0, 10));
+        final Bytes serialized = SessionKeySerde.bytesToBinary(windowedBytesKey);
+        assertEquals(windowedBytesKey, SessionKeySerde.fromBytes(serialized));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 2728aa0..11e605c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -31,12 +32,17 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -74,6 +80,10 @@ public class CachingWindowStoreTest {
         cachingStore.init(context, cachingStore);
     }
 
+    @After
+    public void closeStore() {
+        cachingStore.close();
+    }
 
     @Test
     public void shouldPutFetchFromCache() throws Exception {
@@ -180,6 +190,18 @@ public class CachingWindowStoreTest {
         cachingStore.put("a", "a");
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactKeys() throws Exception {
+        cachingStore.put("a", "0001", 0);
+        cachingStore.put("aa", "0002", 0);
+        cachingStore.put("a", "0003", 1);
+        cachingStore.put("aa", "0004", 1);
+        cachingStore.put("a", "0005", 60000);
+
+        final List<KeyValue<Long, String>> expected = Utils.mkList(KeyValue.pair(0L,
"0001"), KeyValue.pair(1L, "0003"), KeyValue.pair(60000L, "0005"));
+        assertThat(toList(cachingStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+    }
 
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index e1801b8..1b24f8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -36,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -43,6 +45,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBSessionStoreTest {
 
     private SessionStore<String, Long> sessionStore;
+    private MockProcessorContext context;
 
     @Before
     public void before() {
@@ -53,11 +56,11 @@ public class RocksDBSessionStoreTest {
                                                  Serdes.String(),
                                                  Serdes.Long());
 
-        final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
-                                                                      Serdes.String(),
-                                                                      Serdes.Long(),
-                                                                      new NoOpRecordCollector(),
-                                                                      new ThreadCache("testCache",
0, new MockStreamsMetrics(new Metrics())));
+        context = new MockProcessorContext(TestUtils.tempDirectory(),
+                                           Serdes.String(),
+                                           Serdes.Long(),
+                                           new NoOpRecordCollector(),
+                                           new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics())));
         sessionStore.init(context, sessionStore);
     }
 
@@ -145,6 +148,32 @@ public class RocksDBSessionStoreTest {
         assertFalse(results.hasNext());
     }
 
+    @Test
+    public void shouldFetchExactKeys() throws Exception {
+        final RocksDBSegmentedBytesStore bytesStore =
+                new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 2, new
SessionKeySchema());
+
+        sessionStore = new RocksDBSessionStore<>(bytesStore,
+                                                 Serdes.String(),
+                                                 Serdes.Long());
+
+        sessionStore.init(context, sessionStore);
+
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L
- 2, 0x7a00000000000000L - 1)), 5L);
+
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a",
0, Long.MAX_VALUE);
+        final List<Long> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(iterator.next().value);
+        }
+
+        assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L)));
+    }
+
     static List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Windowed<String>,
Long> iterator) {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         while (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d294533/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index d606e84..9f4afec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -51,6 +52,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -969,6 +972,119 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactKeys() throws Exception {
+        final File baseDir = TestUtils.tempDirectory();
+        final RocksDBWindowStoreSupplier<String, String> supplier =
+                new RocksDBWindowStoreSupplier<>(
+                        "window",
+                        0x7a00000000000000L, 2,
+                        true,
+                        Serdes.String(),
+                        Serdes.String(),
+                        0x7a00000000000000L,
+                        true,
+                        Collections.<String, String>emptyMap(),
+                        false);
+
+        final Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(),
byteArraySerde.serializer());
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments")
{
+            @Override
+            public <K1, V1> void send(final String topic,
+                                      K1 key,
+                                      V1 value,
+                                      Integer partition,
+                                      Long timestamp,
+                                      Serializer<K1> keySerializer,
+                                      Serializer<V1> valueSerializer) {
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext(
+                baseDir,
+                byteArraySerde, byteArraySerde,
+                recordCollector, cache);
+
+        final WindowStore<String, String> windowStore = supplier.get();
+        try {
+            windowStore.init(context, windowStore);
+
+            windowStore.put("a", "0001", 0);
+            windowStore.put("aa", "0002", 0);
+            windowStore.put("a", "0003", 1);
+            windowStore.put("aa", "0004", 1);
+            windowStore.put("a", "0005",  0x7a00000000000000L - 1);
+
+            final List expected = Utils.mkList("0001", "0003", "0005");
+            assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+        } finally {
+            windowStore.close();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception {
+        final File baseDir = TestUtils.tempDirectory();
+        final RocksDBWindowStoreSupplier<Bytes, String> supplier =
+                new RocksDBWindowStoreSupplier<>(
+                        "window",
+                        60000, 2,
+                        true,
+                        Serdes.Bytes(),
+                        Serdes.String(),
+                        60000,
+                        true,
+                        Collections.<String, String>emptyMap(),
+                        false);
+
+        final Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(),
byteArraySerde.serializer());
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments")
{
+            @Override
+            public <K1, V1> void send(final String topic,
+                                      K1 key,
+                                      V1 value,
+                                      Integer partition,
+                                      Long timestamp,
+                                      Serializer<K1> keySerializer,
+                                      Serializer<V1> valueSerializer) {
+            }
+        };
+
+        final MockProcessorContext context = new MockProcessorContext(
+                baseDir,
+                byteArraySerde, byteArraySerde,
+                recordCollector, cache);
+
+        final WindowStore<Bytes, String> windowStore = supplier.get();
+        try {
+            windowStore.init(context, windowStore);
+
+            final Bytes key1 = Bytes.wrap(new byte[]{0});
+            final Bytes key2 = Bytes.wrap(new byte[]{0, 0});
+            final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
+            windowStore.put(key1, "1", 0);
+            windowStore.put(key2, "2", 0);
+            windowStore.put(key3, "3", 0);
+            windowStore.put(key1, "4", 1);
+            windowStore.put(key2, "5", 1);
+            windowStore.put(key3, "6", 59999);
+            windowStore.put(key1, "7", 59999);
+            windowStore.put(key2, "8", 59999);
+            windowStore.put(key3, "9", 59999);
+
+            final List expectedKey1 = Utils.mkList("1", "4", "7");
+            assertThat(toList(windowStore.fetch(key1, 0, Long.MAX_VALUE)), equalTo(expectedKey1));
+            final List expectedKey2 = Utils.mkList("2", "5", "8");
+            assertThat(toList(windowStore.fetch(key2, 0, Long.MAX_VALUE)), equalTo(expectedKey2));
+            final List expectedKey3 = Utils.mkList("3", "6", "9");
+            assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
+        } finally {
+            windowStore.close();
+        }
+    }
+
     private <E> List<E> toList(WindowStoreIterator<E> iterator) {
         ArrayList<E> list = new ArrayList<>();
         while (iterator.hasNext()) {


Mime
View raw message