kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4730: Streams does not have an in-memory windowed store (#6239)
Date Thu, 21 Feb 2019 03:10:02 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff603c6  KAFKA-4730: Streams does not have an in-memory windowed store (#6239)
ff603c6 is described below

commit ff603c63bb3e0b47ce14121bb0e96ce0fc42d3c1
Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
AuthorDate: Wed Feb 20 19:09:50 2019 -0800

    KAFKA-4730: Streams does not have an in-memory windowed store (#6239)
    
    Implemented an in-memory window store allowing for range queries. A finite retention period defines how long records will be kept, ie the window of time for fetching, and the grace period defines the window within which late-arriving data may still be written to the store.
    
    Unit tests were written to test the functionality of the window store, including its insert/update/delete and fetch operations. Single-record, all records, and range fetch were tested, for both time ranges and key ranges. The logging and metrics for late-arriving (dropped)records were tested as well as the ability to restore from a changelog.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 docs/ops.html                                      |   1 +
 docs/streams/developer-guide/processor-api.html    |   4 +-
 docs/streams/upgrade-guide.html                    |   5 +
 .../org/apache/kafka/streams/state/Stores.java     |  40 +-
 .../InMemoryWindowBytesStoreSupplier.java          |  88 ++++
 .../state/internals/InMemoryWindowStore.java       | 393 +++++++++++++++++
 .../state/internals/InMemoryWindowStoreTest.java   | 476 +++++++++++++++++++++
 7 files changed, 1005 insertions(+), 2 deletions(-)

diff --git a/docs/ops.html b/docs/ops.html
index 4827d94..5ba9def 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1728,6 +1728,7 @@ All the following metrics have a recording level of <code>debug</code>:
   <ul>
     <li><code>in-memory-state</code></li>
     <li><code>in-memory-lru-state</code></li>
+    <li><code>in-memory-window-state</code></li>
     <li><code>rocksdb-state</code> (for RocksDB backed key-value store)</li>
     <li><code>rocksdb-window-state</code> (for RocksDB backed window store)</li>
     <li><code>rocksdb-session-state</code> (for RocksDB backed session store)</li>
diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html
index 119c85a..4a060a6 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -227,7 +227,7 @@
                                 space.</li>
                             <li>RocksDB settings can be fine-tuned, see
                                 <a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
-                            <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
+                            <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-">store variants</a>:
                                 time window key-value store, session window key-value store.</li>
                         </ul>
                             <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span>
@@ -258,6 +258,8 @@
                             <li>Useful when application instances run in an environment where local
                                 disk space is either not available or local disk space is wiped
                                 in-between app instance restarts.</li>
+                            <li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>:
+                                time window key-value store</li>
                         </ul>
                             <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span>
 <span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;inmemory-counts&quot;.</span>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index d2e4a2e..b4d957a 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -68,6 +68,11 @@
         More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
     </p>
 
+    <h3><a id="streams_api_changes_230" href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
+    <p>
+        As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface <code>inMemoryWindowStore()</code> is added to Stores that provides a built-in in-memory window store.
+    </p>
+
     <h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
     <p>
         We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 91c895f..d8b19fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
 import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
@@ -144,6 +145,43 @@ public class Stores {
     }
 
     /**
+     * Create an in-memory {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              Note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period.
+     * @param windowSize            size of the windows (cannot be negative)
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
+     */
+    public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
+                                                               final Duration retentionPeriod,
+                                                               final Duration windowSize,
+                                                               final boolean retainDuplicates) throws IllegalArgumentException {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
+
+        Objects.requireNonNull(name, "name cannot be null");
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the window store "
+                + name + " must be no smaller than its window size. Got size=["
+                + windowSize + "], retention=[" + retentionPeriod + "]");
+        }
+
+        return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
+    }
+
+    /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be negative).
@@ -166,7 +204,7 @@ public class Stores {
                                                                  final long windowSize,
                                                                  final boolean retainDuplicates) {
         if (numSegments < 2) {
-            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
+            throw new IllegalArgumentException("numSegments cannot be smaller than 2");
         }
 
         final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
new file mode 100644
index 0000000..a6709ae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+public class InMemoryWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
+    private final String name;
+    private final long retentionPeriod;
+    private final long windowSize;
+    private final boolean retainDuplicates;
+
+    public InMemoryWindowBytesStoreSupplier(final String name,
+                                            final long retentionPeriod,
+                                            final long windowSize,
+                                            final boolean retainDuplicates) {
+        this.name = name;
+        this.retentionPeriod = retentionPeriod;
+        this.windowSize = windowSize;
+        this.retainDuplicates = retainDuplicates;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public WindowStore<Bytes, byte[]> get() {
+        return new InMemoryWindowStore<>(name,
+                                         Serdes.Bytes(),
+                                         Serdes.ByteArray(),
+                                         retentionPeriod,
+                                         windowSize,
+                                         retainDuplicates,
+                                         metricsScope());
+    }
+
+    @Override
+    public String metricsScope() {
+        return "in-memory-window-state";
+    }
+
+    @Deprecated
+    @Override
+    public int segments() {
+        throw new IllegalStateException("Segments is deprecated and should not be called");
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+
+
+    @Override
+    public long windowSize() {
+        return windowSize;
+    }
+
+    // In-memory window store is not *really* segmented, so just say size is 1 ms
+    @Override
+    public long segmentIntervalMs() {
+        return 1;
+    }
+
+    @Override
+    public boolean retainDuplicates() {
+        return retainDuplicates;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
new file mode 100644
index 0000000..6e9b96b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKey;
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
+
+public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowStore<K, V> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
+
+    private final String name;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final String metricScope;
+    private StateSerdes<K, V> serdes;
+    private InternalProcessorContext context;
+    private Sensor expiredRecordSensor;
+    private int seqnum = 0;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+    private final long retentionPeriod;
+    private final long windowSize;
+    private final boolean retainDuplicates;
+
+    private final NavigableMap<Long, NavigableMap<WrappedK<K>, V>> segmentMap;
+
+    private volatile boolean open = false;
+
+    InMemoryWindowStore(final String name,
+                               final Serde<K> keySerde,
+                               final Serde<V> valueSerde,
+                               final long retentionPeriod,
+                               final long windowSize,
+                               final boolean retainDuplicates,
+                               final String metricScope) {
+        this.name = name;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.retentionPeriod = retentionPeriod;
+        this.windowSize = windowSize;
+        this.retainDuplicates = retainDuplicates;
+        this.metricScope = metricScope;
+
+        this.segmentMap = new TreeMap<>();
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = (InternalProcessorContext) context;
+
+        // construct the serde
+        this.serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        final StreamsMetricsImpl metrics = this.context.metrics();
+        final String taskName = context.taskId().toString();
+        expiredRecordSensor = metrics.storeLevelSensor(
+            taskName,
+            name(),
+            "expired-window-record-drop",
+            Sensor.RecordingLevel.INFO
+        );
+        addInvocationRateAndCount(
+            expiredRecordSensor,
+            "stream-" + metricScope + "-metrics",
+            metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
+            "expired-window-record-drop"
+        );
+
+        if (root != null) {
+            context.register(root, (key, value) -> {
+                put(extractStoreKey(key, serdes), serdes.valueFrom(value), extractStoreTimestamp(key));
+            });
+        }
+        this.open = true;
+    }
+
+    @Override
+    public void put(final K key, final V value) {
+        put(key, value, context.timestamp());
+    }
+
+    @Override
+    public void put(final K key, final V value, final long windowStartTimestamp) {
+        removeExpiredSegments();
+        maybeUpdateSeqnumForDups();
+        this.observedStreamTime = Math.max(this.observedStreamTime, windowStartTimestamp);
+
+        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
+            expiredRecordSensor.record();
+            LOG.debug("Skipping record for expired segment.");
+        } else {
+            if (value != null) {
+                this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
+                this.segmentMap.get(windowStartTimestamp).put(new WrappedK<>(key, seqnum), value);
+            } else {
+                this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
+                    kvMap.remove(new WrappedK<>(key, seqnum));
+                    return kvMap;
+                });
+            }
+        }
+    }
+
+    @Override
+    public V fetch(final K key, final long windowStartTimestamp) {
+        removeExpiredSegments();
+
+        final NavigableMap<WrappedK<K>, V> kvMap = this.segmentMap.get(windowStartTimestamp);
+        if (kvMap == null) {
+            return null;
+        } else {
+            return kvMap.get(new WrappedK<>(key, seqnum));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        removeExpiredSegments();
+        final List<KeyValue<Long, V>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
+
+        return new InMemoryWindowStoreIterator<>(records.listIterator());
+    }
+
+    @Deprecated
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+        removeExpiredSegments();
+        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+
+        // add one b/c records expire exactly retentionPeriod ms after created
+        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+        final WrappedK<K> keyFrom = new WrappedK<>(from, 0);
+        final WrappedK<K> keyTo = new WrappedK<>(to, Integer.MAX_VALUE);
+
+        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+            }
+        }
+        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+    }
+
+    @Deprecated
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+        removeExpiredSegments();
+        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+
+        // add one b/c records expire exactly retentionPeriod ms after created
+        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+
+        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+            }
+        }
+        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> all() {
+        removeExpiredSegments();
+        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+
+        for (final Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.entrySet()) {
+            for (final Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(),
+                    kvMapEntry.getValue()));
+            }
+        }
+        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public void flush() {
+        // do-nothing since it is in-memory
+    }
+
+    @Override
+    public void close() {
+        this.segmentMap.clear();
+        this.open = false;
+    }
+
+    private List<KeyValue<Long, V>> fetchUnique(final K key, final long timeFrom, final long timeTo) {
+        final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+
+        // add one b/c records expire exactly retentionPeriod ms after created
+        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+
+        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            final V value = segmentMapEntry.getValue().get(new WrappedK<>(key, seqnum));
+            if (value != null) {
+                returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value));
+            }
+        }
+        return returnSet;
+    }
+
+    private List<KeyValue<Long, V>> fetchWithDuplicates(final K key, final long timeFrom, final long timeTo) {
+        final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+
+        // add one b/c records expire exactly retentionPeriod ms after created
+        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+        final WrappedK<K> keyFrom = new WrappedK<>(key, 0);
+        final WrappedK<K> keyTo = new WrappedK<>(key, Integer.MAX_VALUE);
+
+        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+                returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue()));
+            }
+        }
+        return returnSet;
+    }
+
+    private void removeExpiredSegments() {
+        final long minLiveTime = this.observedStreamTime - this.retentionPeriod;
+        this.segmentMap.headMap(minLiveTime, true).clear();
+    }
+
+    private KeyValue<Windowed<K>, V> getWindowedKeyValue(final K key, final long startTimestamp, final V value) {
+        final Windowed<K> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
+        return new KeyValue<>(windowedK, value);
+    }
+
+    private void maybeUpdateSeqnumForDups() {
+        if (retainDuplicates) {
+            seqnum = (seqnum + 1) & 0x7FFFFFFF;
+        }
+    }
+
+    private static class WrappedK<K extends Comparable<K>> implements Comparable<WrappedK<K>> {
+        private final K key;
+        private final int seqnum;
+
+        WrappedK(final K key, final int seqnum) {
+            this.key = key;
+            this.seqnum = seqnum;
+        }
+
+        public K getKey() {
+            return this.key;
+        }
+
+        public int compareTo(final WrappedK<K> k) {
+            final int compareKeys = this.key.compareTo(k.key);
+            if (compareKeys == 0) {
+                return this.seqnum - k.seqnum;
+            } else {
+                return compareKeys;
+            }
+        }
+    }
+
+    private static class InMemoryWindowStoreIterator<V> implements WindowStoreIterator<V> {
+
+        private ListIterator<KeyValue<Long, V>> iterator;
+
+        InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, V>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, V> next() {
+            return iterator.next();
+        }
+
+        @Override
+        public Long peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            } else {
+                final long next = iterator.next().key;
+                iterator.previous();
+                return next;
+            }
+        }
+
+        @Override
+        public void close() {
+            iterator = null;
+        }
+    }
+
+    private static class InMemoryWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+
+        ListIterator<KeyValue<Windowed<K>, V>> iterator;
+
+        InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<K>, V>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<Windowed<K>, V> next() {
+            return iterator.next();
+        }
+
+        @Override
+        public Windowed<K> peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            } else {
+                final Windowed<K> next = iterator.next().key;
+                iterator.previous();
+                return next;
+            }
+        }
+
+        @Override
+        public void close() {
+            iterator = null;
+        }
+    }
+}
+
+
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
new file mode 100644
index 0000000..5de4b44
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.toStoreKeyBinary;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InMemoryWindowStoreTest {
+
+    private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
+
+    private final String storeName = "InMemoryWindowStore";
+    private final long retentionPeriod = 40L * 1000L;
+    private final long windowSize = 10L;
+
+    private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
+
+    private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
+    private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "),
+                                                      DEFAULT_CACHE_SIZE_BYTES,
+                                                      new MockStreamsMetrics(new Metrics()));
+
+    private final Producer<byte[], byte[]> producer =
+        new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
+    private final RecordCollector recordCollector = new RecordCollectorImpl("InMemoryWindowStoreTestTask",
+                                                                            new LogContext("InMemoryWindowStoreTestTask "),
+                                                                            new DefaultProductionExceptionHandler(),
+                                                                            new Metrics().sensor("skipped-records")) {
+        @Override
+        public <K1, V1> void send(final String topic,
+            final K1 key,
+            final V1 value,
+            final Headers headers,
+            final Integer partition,
+            final Long timestamp,
+            final Serializer<K1> keySerializer,
+            final Serializer<V1> valueSerializer) {
+            changeLog.add(new KeyValue<>(
+                keySerializer.serialize(topic, headers, key),
+                valueSerializer.serialize(topic, headers, value))
+            );
+        }
+    };
+
+    private final File baseDir = TestUtils.tempDirectory("test");
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+    private WindowStore<Integer, String> windowStore;
+
+    private WindowStore<Integer, String> createInMemoryWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
+        final WindowStore<Integer, String> store = Stores.windowStoreBuilder(Stores.inMemoryWindowStore(
+                                                                             storeName,
+                                                                             ofMillis(retentionPeriod),
+                                                                             ofMillis(windowSize),
+                                                                             retainDuplicates),
+            Serdes.Integer(),
+            Serdes.String()).build();
+
+        store.init(context, store);
+        return store;
+    }
+
+    @Before
+    public void initRecordCollector() {
+        recordCollector.init(producer);
+    }
+
+    @After
+    public void closeStore() {
+        if (windowStore != null) {
+            windowStore.close();
+        }
+    }
+
+    private void setCurrentTime(final long currentTime) {
+        context.setRecordContext(createRecordContext(currentTime));
+    }
+
+    private ProcessorRecordContext createRecordContext(final long time) {
+        return new ProcessorRecordContext(time, 0, 0, "topic", null);
+    }
+
+    private <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp) {
+        return windowedPair(key, value, timestamp, windowSize);
+    }
+
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp, final long windowSize) {
+        return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
+    }
+
+    @Test
+    public void testSingleFetch() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        currentTime += 3 * windowSize;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        assertEquals("one", windowStore.fetch(1, 0));
+        assertEquals("two", windowStore.fetch(1, windowSize));
+        assertEquals("three", windowStore.fetch(1, 4 * windowSize));
+    }
+
+    @Test
+    public void testDeleteAndUpdate() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        final long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+        windowStore.put(1, "one v2");
+
+        WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
+        assertEquals(new KeyValue<>(currentTime, "one v2"), iterator.next());
+
+        windowStore.put(1, null);
+        iterator = windowStore.fetch(1, 0, currentTime);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testFetchAll() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "four");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "five");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(windowSize * 10, windowSize * 30);
+
+        assertEquals(windowedPair(1, "two", windowSize * 10), iterator.next());
+        assertEquals(windowedPair(1, "three", windowSize * 20), iterator.next());
+        assertEquals(windowedPair(2, "four", windowSize * 30), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testAll() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "four");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
+
+        assertEquals(windowedPair(1, "one", 0), iterator.next());
+        assertEquals(windowedPair(1, "two", windowSize * 10), iterator.next());
+        assertEquals(windowedPair(1, "three", windowSize * 20), iterator.next());
+        assertEquals(windowedPair(2, "four", windowSize * 30), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testTimeRangeFetch() {
+
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "four");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "five");
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, windowSize * 10, 3 * windowSize * 10);
+
+        // should return only the middle three records
+        assertEquals(new KeyValue<>(windowSize * 10, "two"), iterator.next());
+        assertEquals(new KeyValue<>(2 * windowSize * 10, "three"), iterator.next());
+        assertEquals(new KeyValue<>(3 * windowSize * 10, "four"), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testKeyRangeFetch() {
+
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "two");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(3, "three");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(4, "four");
+
+        windowStore.put(5, "five");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(1, 4, 0L, currentTime);
+
+        // should return only the first four keys
+        assertEquals(windowedPair(1, "one", 0), iterator.next());
+        assertEquals(windowedPair(2, "two", windowSize * 10), iterator.next());
+        assertEquals(windowedPair(3, "three", windowSize * 20), iterator.next());
+        assertEquals(windowedPair(4, "four", windowSize * 30), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testFetchDuplicates() {
+        windowStore = createInMemoryWindowStore(context, true);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+        windowStore.put(1, "one-2");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+        windowStore.put(1, "two-2");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+        windowStore.put(1, "three-2");
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, windowSize * 10);
+
+        assertEquals(new KeyValue<>(0L, "one"), iterator.next());
+        assertEquals(new KeyValue<>(0L, "one-2"), iterator.next());
+        assertEquals(new KeyValue<>(windowSize * 10, "two"), iterator.next());
+        assertEquals(new KeyValue<>(windowSize * 10, "two-2"), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testSegmentExpiration() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += retentionPeriod / 4;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        currentTime += retentionPeriod / 4;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        currentTime += retentionPeriod / 4;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "four");
+
+        // increase current time to the full retentionPeriod to expire first record
+        currentTime = currentTime + retentionPeriod / 4;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "five");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
+
+        // effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
+        currentTime = currentTime + retentionPeriod / 4;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "six");
+
+        // should only have middle 4 values, as (only) the first record was expired at the time of the fetch
+        // and the last was inserted after the fetch
+        assertEquals(windowedPair(1, "two", retentionPeriod / 4), iterator.next());
+        assertEquals(windowedPair(1, "three", retentionPeriod / 2), iterator.next());
+        assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next());
+        assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testWindowIteratorPeek() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        final long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
+
+        assertEquals(iterator.peekNextKey(), iterator.next().key);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void testValueIteratorPeek() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        final long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, currentTime);
+
+        assertEquals(iterator.peekNextKey(), iterator.next().key);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldRestore() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        // should be empty initially
+        assertFalse(windowStore.all().hasNext());
+
+        final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<>();
+
+        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(1, 0L, 0, serdes).get(), serdes.rawValue("one")));
+        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(2, windowSize, 0, serdes).get(), serdes.rawValue("two")));
+        restorableEntries.add(new KeyValue<>(toStoreKeyBinary(3, 2 * windowSize, 0, serdes).get(), serdes.rawValue("three")));
+
+        context.restore(storeName, restorableEntries);
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, 2 * windowSize);
+
+        assertEquals(windowedPair(1, "one", 0L), iterator.next());
+        assertEquals(windowedPair(2, "two", windowSize), iterator.next());
+        assertEquals(windowedPair(3, "three", 2 * windowSize), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldLogAndMeasureExpiredRecords() {
+        LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        windowStore = createInMemoryWindowStore(context, false);
+        setCurrentTime(retentionPeriod);
+
+        // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
+        windowStore.put(1, "initial record");
+
+        // Try inserting a record with timestamp 0 -- should be dropped
+        windowStore.put(1, "late record", 0L);
+        windowStore.put(1, "another on-time record");
+
+        LogCaptureAppender.unregister(appender);
+
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+        final Metric dropTotal = metrics.get(new MetricName(
+            "expired-window-record-drop-total",
+            "stream-in-memory-window-state-metrics",
+            "The total number of occurrence of expired-window-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("in-memory-window-state-id", storeName)
+            )
+        ));
+
+        final Metric dropRate = metrics.get(new MetricName(
+            "expired-window-record-drop-rate",
+            "stream-in-memory-window-state-metrics",
+            "The average number of occurrence of expired-window-record-drop operation per second.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("in-memory-window-state-id", storeName)
+            )
+        ));
+
+        assertEquals(1.0, dropTotal.metricValue());
+        assertNotEquals(0.0, dropRate.metricValue());
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Skipping record for expired segment."));
+    }
+}


Mime
View raw message