kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/5] kafka git commit: KAFKA-3452 Follow-up: Optimize ByteStore Scenarios
Date Fri, 03 Feb 2017 19:12:56 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
deleted file mode 100644
index e7c2eb3..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIteratorTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Iterator;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class MergedSortedCacheSessionStoreIteratorTest {
-
-    private final String storeKey = "a";
-    private final String cacheKey = "b";
-
-    private final SessionWindow storeWindow = new SessionWindow(0, 1);
-    private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs
= Collections.singleton(
-            KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow),
storeKey.getBytes())).iterator();
-    private final SessionWindow cacheWindow = new SessionWindow(10, 20);
-    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
-            SessionKeySerde.toBinary(
-                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()),
new LRUCacheEntry(cacheKey.getBytes())))
-            .iterator();
-
-    @Test
-    public void shouldHaveNextFromStore() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
-        assertTrue(mergeIterator.hasNext());
-    }
-
-    @Test
-    public void shouldGetNextFromStore() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
-        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey,
storeWindow), storeKey)));
-    }
-
-    @Test
-    public void shouldPeekNextKeyFromStore() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
-        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow)));
-    }
-
-    @Test
-    public void shouldHaveNextFromCache() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
-                                 cacheKvs);
-        assertTrue(mergeIterator.hasNext());
-    }
-
-    @Test
-    public void shouldGetNextFromCache() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
cacheKvs);
-        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey,
cacheWindow), cacheKey)));
-    }
-
-    @Test
-    public void shouldPeekNextKeyFromCache() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
cacheKvs);
-        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow)));
-    }
-
-    @Test
-    public void shouldIterateBothStoreAndCache() throws Exception {
-        final MergedSortedCacheSessionStoreIterator<String, String> iterator = createIterator(storeKvs,
cacheKvs);
-        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey,
storeWindow), storeKey)));
-        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey,
cacheWindow), cacheKey)));
-        assertFalse(iterator.hasNext());
-    }
-
-    private MergedSortedCacheSessionStoreIterator<String, String> createIterator(final
Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
-                                                                                 final Iterator<KeyValue<Bytes,
LRUCacheEntry>> cacheKvs) {
-        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
-                = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
-
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
-                = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
-        return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator,
new StateSerdes<>("name", Serdes.String(), Serdes.String()));
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
deleted file mode 100644
index 376fca8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class MergedSortedCacheWindowStoreIteratorTest {
-
-    private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  new MockStreamsMetrics(new
Metrics()));
-    private final String namespace = "one";
-    private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo",
Serdes.String(), Serdes.String());
-
-    @Test
-    public void shouldIterateOverValueFromBothIterators() throws Exception {
-        final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();
-        for (long t = 0; t < 100; t += 20) {
-            final byte[] v1Bytes = String.valueOf(t).getBytes();
-            final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
-            windowStoreKvPairs.add(v1);
-            expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
-            final byte[] keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
-            final byte[] valBytes = String.valueOf(t + 10).getBytes();
-            expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
-            cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes));
-        }
-
-        byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
-        byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
-        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name",
new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
-
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace,
binaryFrom, binaryTo);
-
-        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
-        int index = 0;
-        while (iterator.hasNext()) {
-            final KeyValue<Long, byte[]> next = iterator.next();
-            final KeyValue<Long, byte[]> expected = expectedKvPairs.get(index++);
-            assertArrayEquals(expected.value, next.value);
-            assertEquals(expected.key, next.key);
-        }
-    }
-
-    @Test
-    public void shouldPeekNextKey() throws Exception {
-        windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
-        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
-        byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
-        byte[] binaryTo = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
-        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("name",
new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace,
binaryFrom, binaryTo);
-        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
-        assertThat(iterator.peekNextKey(), equalTo(0L));
-        iterator.next();
-        assertThat(iterator.peekNextKey(), equalTo(10L));
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
new file mode 100644
index 0000000..38e577b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MergedSortedCacheWrappedSessionStoreIteratorTest {
+
+    private final String storeKey = "a";
+    private final String cacheKey = "b";
+
+    private final SessionWindow storeWindow = new SessionWindow(0, 1);
+    private final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs
= Collections.singleton(
+            KeyValue.pair(new Windowed<>(Bytes.wrap(storeKey.getBytes()), storeWindow),
storeKey.getBytes())).iterator();
+    private final SessionWindow cacheWindow = new SessionWindow(10, 20);
+    private final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs = Collections.singleton(KeyValue.pair(
+            SessionKeySerde.toBinary(
+                    new Windowed<>(cacheKey, cacheWindow), Serdes.String().serializer()),
new LRUCacheEntry(cacheKey.getBytes())))
+            .iterator();
+
+    @Test
+    public void shouldHaveNextFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey,
storeWindow), storeKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromStore() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow)));
+    }
+
+    @Test
+    public void shouldHaveNextFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
+                                 cacheKvs);
+        assertTrue(mergeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldGetNextFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
cacheKvs);
+        assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey,
cacheWindow), cacheKey)));
+    }
+
+    @Test
+    public void shouldPeekNextKeyFromCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> mergeIterator
+                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
cacheKvs);
+        assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow)));
+    }
+
+    @Test
+    public void shouldIterateBothStoreAndCache() throws Exception {
+        final MergedSortedCacheSessionStoreIterator<String, String> iterator = createIterator(storeKvs,
cacheKvs);
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey,
storeWindow), storeKey)));
+        assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey,
cacheWindow), cacheKey)));
+        assertFalse(iterator.hasNext());
+    }
+
+    private MergedSortedCacheSessionStoreIterator<String, String> createIterator(final
Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
+                                                                                 final Iterator<KeyValue<Bytes,
LRUCacheEntry>> cacheKvs) {
+        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
+                = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
+                = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
+        return new MergedSortedCacheSessionStoreIterator<>(cacheIterator, storeIterator,
new StateSerdes<>("name", Serdes.String(), Serdes.String()));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
new file mode 100644
index 0000000..b8e9b94
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.KeyValueIteratorStub;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class MergedSortedCacheWrappedWindowStoreIteratorTest {
+
+    private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
+    private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  new MockStreamsMetrics(new
Metrics()));
+    private final String namespace = "one";
+    private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo",
Serdes.String(), Serdes.String());
+
+    @Test
+    public void shouldIterateOverValueFromBothIterators() throws Exception {
+        final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();
+        for (long t = 0; t < 100; t += 20) {
+            final byte[] v1Bytes = String.valueOf(t).getBytes();
+            final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
+            windowStoreKvPairs.add(v1);
+            expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
+            final Bytes keyBytes = WindowStoreUtils.toBinaryKey("a", t + 10, 0, stateSerdes);
+            final byte[] valBytes = String.valueOf(t + 10).getBytes();
+            expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
+            cache.put(namespace, keyBytes, new LRUCacheEntry(valBytes));
+        }
+
+        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
+        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store",
new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace,
fromBytes, toBytes);
+
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        int index = 0;
+        while (iterator.hasNext()) {
+            final KeyValue<Long, byte[]> next = iterator.next();
+            final KeyValue<Long, byte[]> expected = expectedKvPairs.get(index++);
+            assertArrayEquals(expected.value, next.value);
+            assertEquals(expected.key, next.key);
+        }
+    }
+
+    @Test
+    public void shouldPeekNextKey() throws Exception {
+        windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
+        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
+        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
+        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store",
new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace,
fromBytes, toBytes);
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        assertThat(iterator.peekNextKey(), equalTo(0L));
+        iterator.next();
+        assertThat(iterator.peekNextKey(), equalTo(10L));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index a2ce96c..92fbb2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -106,7 +106,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K,
V>,
 
         @Override
         public Long peekNextKey() {
-            throw new UnsupportedOperationException("peekNextKey not supported in stub");
+            throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
         }
 
         @Override


Mime
View raw message