kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: [MINOR] Guard against crashing on invalid key range queries (#6521)
Date Wed, 10 Apr 2019 19:11:08 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f5a69a  [MINOR] Guard against crashing on invalid key range queries (#6521)
9f5a69a is described below

commit 9f5a69a4c2d6ac812ab6134e64839602a0840b87
Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
AuthorDate: Wed Apr 10 12:10:47 2019 -0700

    [MINOR] Guard against crashing on invalid key range queries (#6521)
    
    Due to KAFKA-8159, Streams will throw an unchecked exception when a caching layer or in-memory
underlying store is queried over a range of keys from negative to positive. We should add
a check for this and log it then return an empty iterator (as the RocksDB stores happen to
do) rather than crash
    
    Reviewers: Bruno Cadonna <bruno@confluent.io> Bill Bejeck <bbejeck@gmail.com>
---
 .../AbstractRocksDBSegmentedBytesStore.java        |  7 ++++
 .../state/internals/CachingKeyValueStore.java      | 11 ++++++
 .../state/internals/CachingSessionStore.java       | 11 ++++++
 .../state/internals/CachingWindowStore.java        | 11 ++++++
 .../state/internals/InMemoryKeyValueStore.java     | 12 +++++++
 .../state/internals/InMemoryWindowStore.java       |  9 ++++-
 .../state/internals/MemoryNavigableLRUCache.java   | 12 +++++++
 .../streams/state/internals/RocksDBStore.java      |  8 +++++
 .../state/internals/AbstractKeyValueStoreTest.java | 31 +++++++++++++++++
 .../state/internals/CachingSessionStoreTest.java   | 36 ++++++++++++++++++++
 .../state/internals/CachingWindowStoreTest.java    | 39 ++++++++++++++++++++++
 .../state/internals/InMemoryWindowStoreTest.java   | 34 +++++++++++++++++++
 .../state/internals/RocksDBSessionStoreTest.java   | 35 ++++++++++++++++++-
 .../state/internals/RocksDBWindowStoreTest.java    | 37 ++++++++++++++++++++
 14 files changed, 291 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 34639e3..22f3a02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -86,6 +86,13 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment>
implements Se
                                                  final Bytes keyTo,
                                                  final long from,
                                                  final long to) {
+        if (keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to);
 
         final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index bb347de..95e20b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -30,11 +30,15 @@ import java.util.Objects;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class CachingKeyValueStore
     extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
     implements KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]>
{
 
+    private static final Logger LOG = LoggerFactory.getLogger(CachingKeyValueStore.class);
+
     private CacheFlushListener<byte[], byte[]> flushListener;
     private boolean sendOldValues;
     private String cacheName;
@@ -228,6 +232,13 @@ class CachingKeyValueStore
     @Override
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                  final Bytes to) {
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         validateStoreOpen();
         final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from,
to);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
from, to);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index edea7e0..9599105 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -26,11 +26,15 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 
 import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class CachingSessionStore
     extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
     implements SessionStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]>
{
 
+    private static final Logger LOG = LoggerFactory.getLogger(CachingSessionStore.class);
+
     private final SessionKeySchema keySchema;
     private final SegmentedCacheFunction cacheFunction;
     private String cacheName;
@@ -153,6 +157,13 @@ class CachingSessionStore
                                                                   final Bytes keyTo,
                                                                   final long earliestSessionEndTime,
                                                                   final long latestSessionStartTime)
{
+        if (keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         validateStoreOpen();
 
         final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0edd8f2..3875a79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -28,11 +28,15 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class CachingWindowStore
     extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
     implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class);
+
     private final long windowSize;
     private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema();
 
@@ -196,6 +200,13 @@ class CachingWindowStore
                                                            final Bytes to,
                                                            final long timeFrom,
                                                            final long timeTo) {
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
         validateStoreOpen();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index b37c39e..0733780 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -28,12 +28,16 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Iterator;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
     private final ConcurrentNavigableMap<Bytes, byte[]> map;
     private volatile boolean open = false;
 
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);
+
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
 
@@ -111,6 +115,14 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
 
     @Override
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to)
{
+
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         return new DelegatingPeekingKeyValueIterator<>(
             name,
             new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 7d1b279..0cee668 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -131,7 +131,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
 
         if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
             expiredRecordSensor.record();
-            LOG.debug("Skipping record for expired segment.");
+            LOG.warn("Skipping record for expired segment.");
         } else {
             if (value != null) {
                 this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
@@ -185,6 +185,13 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
                                                            final long timeTo) {
         removeExpiredSegments();
 
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod
+ 1);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index c3cc834..4bf42de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -23,15 +23,27 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MemoryNavigableLRUCache extends MemoryLRUCache {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MemoryNavigableLRUCache.class);
+
     public MemoryNavigableLRUCache(final String name, final int maxCacheSize) {
         super(name, maxCacheSize);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to)
{
+
+        if (from.compareTo(to) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         final TreeMap<Bytes, byte[]> treeMap = toTreeMap();
         return new DelegatingPeekingKeyValueIterator<>(name(),
             new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet()
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3e3e478..3cf8e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -284,6 +284,14 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BulkLoadingSt
                                                               final Bytes to) {
         Objects.requireNonNull(from, "from cannot be null");
         Objects.requireNonNull(to, "to cannot be null");
+
+        if (from.compareTo(to) > 0) {
+            log.warn("Returning empty iterator for fetch with invalid key range: from >
to. "
+                + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this for negative
numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
         validateStoreOpen();
 
         final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from,
to);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 09a3736..7df6532 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
@@ -38,6 +39,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -380,6 +382,21 @@ public abstract class AbstractKeyValueStoreTest {
     }
 
     @Test
+    public void shouldReturnSameResultsForGetAndRangeWithEqualKeys() {
+        final List<KeyValue<Integer, String>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(1, "one"));
+        entries.add(new KeyValue<>(2, "two"));
+        entries.add(new KeyValue<>(3, "three"));
+
+        store.putAll(entries);
+
+        final Iterator<KeyValue<Integer, String>> iterator = store.range(2, 2);
+
+        assertEquals(iterator.next().value, store.get(2));
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldNotThrowConcurrentModificationException() {
         store.put(0, "zero");
 
@@ -389,4 +406,18 @@ public abstract class AbstractKeyValueStoreTest {
 
         assertEquals(new KeyValue<>(0, "zero"), results.next());
     }
+
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final KeyValueIterator iterator = store.range(-1, 1);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 66b27f0..48c96a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
@@ -28,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
@@ -50,6 +52,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertArrayEquals;
@@ -340,6 +343,22 @@ public class CachingSessionStoreTest {
     }
 
     @Test
+    public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions()
{
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes());
+        cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes());
+        cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes());
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> singleKeyIterator = cachingStore.findSessions(keyAA,
0L, 10L);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = cachingStore.findSessions(keyAA,
keyAA, 0L, 10L);
+
+        assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
+        assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
+        assertFalse(singleKeyIterator.hasNext());
+        assertFalse(keyRangeIterator.hasNext());
+    }
+
+    @Test
     public void shouldClearNamespaceCacheOnClose() {
         final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0,
0));
         cachingStore.put(a1, "1".getBytes());
@@ -412,6 +431,23 @@ public class CachingSessionStoreTest {
         cachingStore.put(null, "1".getBytes());
     }
 
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
+        final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
+
+        final KeyValueIterator iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
+
     private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final
String... sessionIds) {
         final Random random = new Random();
         final List<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index da49dde..b0ccc15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -61,6 +62,7 @@ import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -234,6 +236,10 @@ public class CachingWindowStoreTest {
         return Bytes.wrap(key.getBytes());
     }
 
+    private String stringFrom(final byte[] from) {
+        return Serdes.String().deserializer().deserialize("", from);
+    }
+
     @Test
     public void shouldPutFetchRangeFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
@@ -543,6 +549,22 @@ public class CachingWindowStoreTest {
         );
     }
 
+    @Test
+    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
+        cachingStore.put(bytesKey("a"), bytesValue("0001"), 0);
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 1);
+        cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2);
+        cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3);
+
+        final WindowStoreIterator<byte[]> singleKeyIterator = cachingStore.fetch(bytesKey("aa"),
0L, 5L);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> keyRangeIterator = cachingStore.fetch(bytesKey("aa"),
bytesKey("aa"), 0L, 5L);
+
+        assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value));
+        assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value));
+        assertFalse(singleKeyIterator.hasNext());
+        assertFalse(keyRangeIterator.hasNext());
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
         cachingStore.put(null, bytesValue("anyValue"));
@@ -568,6 +590,23 @@ public class CachingWindowStoreTest {
         cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L));
     }
 
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
+        final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
+
+        final KeyValueIterator iterator = cachingStore.fetch(keyFrom, keyTo, 0L, 10L);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
+
     private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String
key, final String value, final long timestamp) {
         return KeyValue.pair(
             new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index e7f5ed0..df924ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -569,6 +569,24 @@ public class InMemoryWindowStoreTest {
     }
 
     @Test
+    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        windowStore.put(1, "one", 0L);
+        windowStore.put(2, "two", 1L);
+        windowStore.put(2, "two", 2L);
+        windowStore.put(3, "three", 3L);
+
+        final WindowStoreIterator<String> singleKeyIterator = windowStore.fetch(2,
0L, 5L);
+        final KeyValueIterator<Windowed<Integer>, String> keyRangeIterator =
windowStore.fetch(2, 2, 0L, 5L);
+
+        assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value);
+        assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value);
+        assertFalse(singleKeyIterator.hasNext());
+        assertFalse(keyRangeIterator.hasNext());
+    }
+
+    @Test
     public void shouldNotThrowExceptionWhenFetchRangeIsExpired() {
         windowStore = createInMemoryWindowStore(context, false);
 
@@ -579,4 +597,20 @@ public class InMemoryWindowStoreTest {
 
         assertFalse(iterator.hasNext());
     }
+
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 0786c37..1821913 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
@@ -41,6 +42,7 @@ import static java.time.Duration.ofMillis;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.valuesToList;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -224,6 +226,22 @@ public class RocksDBSessionStoreTest {
         }
     }
 
+    @Test
+    public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions()
{
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1)), 0L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(2, 3)), 1L);
+        sessionStore.put(new Windowed<>("aa", new SessionWindow(4, 5)), 2L);
+        sessionStore.put(new Windowed<>("aaa", new SessionWindow(6, 7)), 3L);
+
+        final KeyValueIterator<Windowed<String>, Long> singleKeyIterator = sessionStore.findSessions("aa",
0L, 10L);
+        final KeyValueIterator<Windowed<String>, Long> keyRangeIterator = sessionStore.findSessions("aa",
"aa", 0L, 10L);
+
+        assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
+        assertEquals(singleKeyIterator.next(), keyRangeIterator.next());
+        assertFalse(singleKeyIterator.hasNext());
+        assertFalse(keyRangeIterator.hasNext());
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
         sessionStore.findSessions(null, 1L, 2L);
@@ -263,6 +281,21 @@ public class RocksDBSessionStoreTest {
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
         sessionStore.put(null, 1L);
     }
-    
 
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("",
-1));
+        final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("",
1));
+
+        final KeyValueIterator iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 42b1b8c..7405e06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -33,6 +33,8 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -58,6 +60,7 @@ import static java.time.Instant.ofEpochMilli;
 import static java.util.Arrays.asList;
 import static java.util.Objects.requireNonNull;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1398,6 +1401,40 @@ public class RocksDBWindowStoreTest {
         assertThat(toList(windowStore.fetch(key3, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE))),
equalTo(expectedKey3));
     }
 
+    @Test
+    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
+        windowStore = createWindowStore(context, false);
+
+        windowStore.put(1, "one", 0L);
+        windowStore.put(2, "two", 1L);
+        windowStore.put(2, "two", 2L);
+        windowStore.put(3, "three", 3L);
+
+        final WindowStoreIterator<String> singleKeyIterator = windowStore.fetch(2,
0L, 5L);
+        final KeyValueIterator<Windowed<Integer>, String> keyRangeIterator =
windowStore.fetch(2, 2, 0L, 5L);
+
+        assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value);
+        assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value);
+        assertFalse(singleKeyIterator.hasNext());
+        assertFalse(keyRangeIterator.hasNext());
+    }
+
+    @Test
+    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
+        windowStore = createWindowStore(context, false);
+
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L);
+        assertFalse(iterator.hasNext());
+
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key
range: from > to. "
+            + "This may be due to serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. "
+            + "Note that the built-in numerical serdes do not follow this for negative numbers"));
+    }
+
     private void putFirstBatch(final WindowStore<Integer, String> store,
                                @SuppressWarnings("SameParameterValue") final long startTime,
                                final InternalMockProcessorContext context) {


Mime
View raw message