kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: In-memory stores cleanup (#6595)
Date Fri, 26 Apr 2019 18:50:45 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc31fea  MINOR: In-memory stores cleanup (#6595)
dc31fea is described below

commit dc31fea8bc06e1fb58a3698e55ab44d765f3dc9f
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri Apr 26 11:50:35 2019 -0700

    MINOR: In-memory stores cleanup (#6595)
    
    While going through the review of InMemorySessionStore I realized there is also some minor
cleanup to be done for the other in-memory stores. This includes trivial changes such as removing
unnecessary references to 'this' and moving collection initialization to the declaration.
It also fixes some unsafe behavior (registering an iterator from inside its own constructor).
In-memory window store iterator classes were made static and some instances of KeyValueIterator
missing types were f [...]
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>,  Bruno Cadonna <bruno@confluent.io>
---
 .../state/internals/InMemoryKeyValueStore.java     |  28 ++--
 .../state/internals/InMemoryWindowStore.java       | 169 ++++++++++++---------
 .../state/internals/AbstractKeyValueStoreTest.java |   2 +-
 .../state/internals/CachingSessionStoreTest.java   |   2 +-
 .../state/internals/CachingWindowStoreTest.java    |   2 +-
 .../state/internals/InMemoryWindowStoreTest.java   |   2 +-
 .../state/internals/RocksDBSessionStoreTest.java   |   2 +-
 .../state/internals/RocksDBWindowStoreTest.java    |   2 +-
 8 files changed, 114 insertions(+), 95 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 0733780..aa7b2cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -33,20 +33,18 @@ import org.slf4j.LoggerFactory;
 
 public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
-    private final ConcurrentNavigableMap<Bytes, byte[]> map;
+    private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
     private volatile boolean open = false;
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);
 
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
-
-        this.map = new ConcurrentSkipListMap<>();
     }
 
     @Override
     public String name() {
-        return this.name;
+        return name;
     }
 
     @Override
@@ -65,7 +63,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]>
{
             });
         }
 
-        this.open = true;
+        open = true;
     }
 
     @Override
@@ -75,20 +73,20 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
 
     @Override
     public boolean isOpen() {
-        return this.open;
+        return open;
     }
 
     @Override
     public byte[] get(final Bytes key) {
-        return this.map.get(key);
+        return map.get(key);
     }
 
     @Override
     public void put(final Bytes key, final byte[] value) {
         if (value == null) {
-            this.map.remove(key);
+            map.remove(key);
         } else {
-            this.map.put(key, value);
+            map.put(key, value);
         }
     }
 
@@ -110,7 +108,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
 
     @Override
     public byte[] delete(final Bytes key) {
-        return this.map.remove(key);
+        return map.remove(key);
     }
 
     @Override
@@ -125,19 +123,19 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
 
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
+            new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator(this.map.entrySet().iterator()));
+            new InMemoryKeyValueIterator(map.entrySet().iterator()));
     }
 
     @Override
     public long approximateNumEntries() {
-        return this.map.size();
+        return map.size();
     }
 
     @Override
@@ -147,8 +145,8 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
 
     @Override
     public void close() {
-        this.map.clear();
-        this.open = false;
+        map.clear();
+        open = false;
     }
 
     private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]>
{
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 0cee668..797a5d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -63,8 +63,8 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>
segmentMap;
-    private final Set<InMemoryWindowStoreIteratorWrapper> openIterators;
+    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>
segmentMap = new ConcurrentSkipListMap<>();
+    private final Set<InMemoryWindowStoreIteratorWrapper> openIterators = ConcurrentHashMap.newKeySet();
 
     private volatile boolean open = false;
 
@@ -78,18 +78,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         this.windowSize = windowSize;
         this.retainDuplicates = retainDuplicates;
         this.metricScope = metricScope;
-
-        this.openIterators = ConcurrentHashMap.newKeySet();
-        this.segmentMap = new ConcurrentSkipListMap<>();
     }
 
     @Override
     public String name() {
-        return this.name;
+        return name;
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = (InternalProcessorContext) context;
 
@@ -113,7 +109,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
                 put(Bytes.wrap(extractStoreKeyBytes(key)), value, extractStoreTimestamp(key));
             });
         }
-        this.open = true;
+        open = true;
     }
 
     @Override
@@ -125,19 +121,19 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
     public void put(final Bytes key, final byte[] value, final long windowStartTimestamp)
{
         removeExpiredSegments();
         maybeUpdateSeqnumForDups();
-        this.observedStreamTime = Math.max(this.observedStreamTime, windowStartTimestamp);
+        observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
 
         final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
 
-        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
+        if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
             expiredRecordSensor.record();
             LOG.warn("Skipping record for expired segment.");
         } else {
             if (value != null) {
-                this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
-                this.segmentMap.get(windowStartTimestamp).put(keyBytes, value);
+                segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
+                segmentMap.get(windowStartTimestamp).put(keyBytes, value);
             } else {
-                this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
+                segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
                     kvMap.remove(keyBytes);
                     return kvMap;
                 });
@@ -149,11 +145,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
     public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
         removeExpiredSegments();
 
-        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
+        if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
             return null;
         }
 
-        final ConcurrentNavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
+        final ConcurrentNavigableMap<Bytes, byte[]> kvMap = segmentMap.get(windowStartTimestamp);
         if (kvMap == null) {
             return null;
         } else {
@@ -167,14 +163,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         removeExpiredSegments();
 
         // add one b/c records expire exactly retentionPeriod ms after created
-        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod
+ 1);
+        final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1);
 
         if (timeTo < minTime) {
-            return new WrappedInMemoryWindowStoreIterator();
+            return WrappedInMemoryWindowStoreIterator.emptyIterator();
         }
 
-        return new WrappedInMemoryWindowStoreIterator(
-            key, key, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
+        return registerNewWindowStoreIterator(
+            key, segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Deprecated
@@ -193,14 +189,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         }
 
         // add one b/c records expire exactly retentionPeriod ms after created
-        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod
+ 1);
+        final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1);
 
         if (timeTo < minTime) {
-            return new WrappedWindowedKeyValueIterator();
+            return KeyValueIterators.emptyIterator();
         }
 
-        return new WrappedWindowedKeyValueIterator(
-            from, to, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
+        return registerNewWindowedKeyValueIterator(
+            from, to, segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Deprecated
@@ -209,24 +205,24 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         removeExpiredSegments();
 
         // add one b/c records expire exactly retentionPeriod ms after created
-        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod
+ 1);
+        final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1);
 
         if (timeTo < minTime) {
-            return new WrappedWindowedKeyValueIterator();
+            return KeyValueIterators.emptyIterator();
         }
 
-        return new WrappedWindowedKeyValueIterator(
-            null, null, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
+        return registerNewWindowedKeyValueIterator(
+            null, null, segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         removeExpiredSegments();
 
-        final long minTime = this.observedStreamTime - this.retentionPeriod;
+        final long minTime = observedStreamTime - retentionPeriod;
 
-        return new WrappedWindowedKeyValueIterator(
-            null, null, this.segmentMap.tailMap(minTime, false).entrySet().iterator());
+        return registerNewWindowedKeyValueIterator(
+            null, null, segmentMap.tailMap(minTime, false).entrySet().iterator());
     }
 
     @Override
@@ -236,7 +232,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
 
     @Override
     public boolean isOpen() {
-        return this.open;
+        return open;
     }
 
     @Override
@@ -246,16 +242,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
 
     @Override
     public void close() {
-        this.segmentMap.clear();
-        this.open = false;
+        segmentMap.clear();
+        open = false;
     }
 
     private void removeExpiredSegments() {
-        long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod +
1);
+        long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1);
         for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
             minLiveTime = Math.min(minLiveTime, it.minTime());
         }
-        this.segmentMap.headMap(minLiveTime, false).clear();
+        segmentMap.headMap(minLiveTime, false).clear();
     }
 
     private void maybeUpdateSeqnumForDups() {
@@ -279,7 +275,41 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
 
     }
 
-    private abstract class InMemoryWindowStoreIteratorWrapper implements Comparable<InMemoryWindowStoreIteratorWrapper>
{
+    private WrappedInMemoryWindowStoreIterator registerNewWindowStoreIterator(final Bytes
key,
+                                                                              final Iterator<Map.Entry<Long,
ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+        final Bytes keyFrom = retainDuplicates ? wrapForDups(key, 0) : key;
+        final Bytes keyTo = retainDuplicates ? wrapForDups(key, Integer.MAX_VALUE) : key;
+
+        final WrappedInMemoryWindowStoreIterator iterator =
+            new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove);
+
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(final Bytes
keyFrom,
+                                                                                final Bytes
keyTo,
+                                                                                final Iterator<Map.Entry<Long,
ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+        final Bytes from = (retainDuplicates && keyFrom != null) ? wrapForDups(keyFrom,
0) : keyFrom;
+        final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo,
Integer.MAX_VALUE) : keyTo;
+
+        final WrappedWindowedKeyValueIterator iterator =
+            new WrappedWindowedKeyValueIterator(from,
+                                                to,
+                                                segmentIterator,
+                                                openIterators::remove,
+                                                retainDuplicates,
+                                                windowSize);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+
+    interface ClosingCallback {
+        void deregisterIterator(final InMemoryWindowStoreIteratorWrapper iterator);
+    }
+
+    private static abstract class InMemoryWindowStoreIteratorWrapper {
 
         private Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>>
segmentIterator;
         private Iterator<Map.Entry<Bytes, byte[]>> recordIterator;
@@ -287,31 +317,21 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         private long currentTime;
 
         private final boolean allKeys;
-        private Bytes keyFrom;
-        private Bytes keyTo;
-
-        // Default constructor sets up a dummy iterator when no results are returned (eg
entire fetch range is expired)
-        InMemoryWindowStoreIteratorWrapper() {
-            this.allKeys = false;
-            recordIterator = null;
-        }
+        private final Bytes keyFrom;
+        private final Bytes keyTo;
+        private final ClosingCallback callback;
 
         InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom,
                                            final Bytes keyTo,
-                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator) {
-            this.allKeys = (keyFrom == null) && (keyTo == null);
-            if (retainDuplicates && !allKeys) {
-                this.keyFrom = wrapForDups(keyFrom, 0);
-                this.keyTo = wrapForDups(keyTo, Integer.MAX_VALUE);
-            } else {
-                this.keyFrom = keyFrom;
-                this.keyTo = keyTo;
-            }
+                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator,
+                                           final ClosingCallback callback) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            allKeys = (keyFrom == null) && (keyTo == null);
 
             this.segmentIterator = segmentIterator;
-            this.recordIterator = setRecordIterator();
-
-            openIterators.add(this);
+            this.callback = callback;
+            recordIterator = segmentIterator == null ? null : setRecordIterator();
         }
 
         public boolean hasNext() {
@@ -332,7 +352,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         }
 
         public void close() {
-            openIterators.remove(this);
+            callback.deregisterIterator(this);
         }
 
         // getNext is only called when either recordIterator or segmentIterator has a next
@@ -368,22 +388,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         Long minTime() {
             return currentTime;
         }
-
-        public int compareTo(final InMemoryWindowStoreIteratorWrapper other) {
-            return (int) (minTime() - other.minTime());
-        }
     }
 
-    private class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper
implements WindowStoreIterator<byte[]>  {
-
-        WrappedInMemoryWindowStoreIterator() {
-            super();
-        }
+    private static class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper
implements WindowStoreIterator<byte[]>  {
 
         WrappedInMemoryWindowStoreIterator(final Bytes keyFrom,
                                            final Bytes keyTo,
-                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator) {
-            super(keyFrom, keyTo, segmentIterator);
+                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator,
+                                           final ClosingCallback callback)  {
+            super(keyFrom, keyTo, segmentIterator, callback);
         }
 
         @Override
@@ -404,18 +417,26 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
             super.next = null;
             return result;
         }
+
+        public static WrappedInMemoryWindowStoreIterator emptyIterator() {
+            return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { });
+        }
     }
 
-    private class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper
implements KeyValueIterator<Windowed<Bytes>, byte[]> {
+    private static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper
implements KeyValueIterator<Windowed<Bytes>, byte[]> {
 
-        WrappedWindowedKeyValueIterator() {
-            super();
-        }
+        private final boolean retainDuplicates;
+        private final long windowSize;
 
         WrappedWindowedKeyValueIterator(final Bytes keyFrom,
                                         final Bytes keyTo,
-                                        final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator) {
-            super(keyFrom, keyTo, segmentIterator);
+                                        final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes,
byte[]>>> segmentIterator,
+                                        final ClosingCallback callback,
+                                        final boolean retainDuplicates,
+                                        final long windowSize) {
+            super(keyFrom, keyTo, segmentIterator, callback);
+            this.retainDuplicates = retainDuplicates;
+            this.windowSize = windowSize;
         }
 
         public Windowed<Bytes> peekNextKey() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 7df6532..5c49818 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -412,7 +412,7 @@ public abstract class AbstractKeyValueStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
 
-        final KeyValueIterator iterator = store.range(-1, 1);
+        final KeyValueIterator<Integer, String> iterator = store.range(-1, 1);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 48c96a2..8c7325c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -439,7 +439,7 @@ public class CachingSessionStoreTest {
         final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
         final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
 
-        final KeyValueIterator iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(keyFrom,
keyTo, 0L, 10L);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index b0ccc15..c8d9cc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -598,7 +598,7 @@ public class CachingWindowStoreTest {
         final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
         final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
 
-        final KeyValueIterator iterator = cachingStore.fetch(keyFrom, keyTo, 0L, 10L);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetch(keyFrom,
keyTo, 0L, 10L);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index df924ec..1524d9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -605,7 +605,7 @@ public class InMemoryWindowStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
 
-        final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L);
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(-1,
1, 0L, 10L);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 1821913..80ea4ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -290,7 +290,7 @@ public class RocksDBSessionStoreTest {
         final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("",
-1));
         final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("",
1));
 
-        final KeyValueIterator iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L);
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(keyFrom,
keyTo, 0L, 10L);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 7405e06..3342207 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -1426,7 +1426,7 @@ public class RocksDBWindowStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
 
-        final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L);
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(-1,
1, 0L, 10L);
         assertFalse(iterator.hasNext());
 
         final List<String> messages = appender.getMessages();


Mime
View raw message