kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] 01/01: KAFKA-3522: Add TimestampedWindowStore builder/runtime classes
Date Thu, 07 Mar 2019 01:25:15 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-3522-rocksdb-format-windowstore-runtime
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b03a8113e3c0b3bc5e0b7b6a314554a99e403bc4
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Sun Dec 16 15:59:20 2018 +0100

    KAFKA-3522: Add TimestampedWindowStore builder/runtime classes
---
 .../internals/InternalTopologyBuilder.java         |   7 +-
 .../processor/internals/ProcessorContextImpl.java  | 127 ++++++++++++++++++++-
 .../streams/state/TimestampedWindowStore.java      |   2 +-
 .../state/internals/CachingWindowStore.java        |  47 ++++----
 ...ChangeLoggingTimestampedKeyValueBytesStore.java |   1 +
 ... ChangeLoggingTimestampedWindowBytesStore.java} |  12 +-
 .../internals/ChangeLoggingWindowBytesStore.java   |  48 +++++---
 .../internals/MeteredTimestampedWindowStore.java   |  58 ++++++++++
 .../state/internals/MeteredWindowStore.java        |  20 ++--
 .../internals/TimestampedWindowStoreBuilder.java   |  74 ++++++++++++
 .../internals/ProcessorContextImplTest.java        | 109 ++++++++++++++----
 .../state/internals/CachingSessionStoreTest.java   |  14 +--
 .../state/internals/CachingWindowStoreTest.java    |  16 +--
 ...ngeLoggingTimestampedWindowBytesStoreTest.java} |  61 ++++++----
 .../ChangeLoggingWindowBytesStoreTest.java         |  31 ++---
 .../internals/MeteredTimestampWindowStoreTest.java |  92 +++++++++++++++
 .../org/apache/kafka/test/StreamsTestUtils.java    |   7 +-
 17 files changed, 601 insertions(+), 125 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0648fec..334adce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,6 +141,8 @@ public class InternalTopologyBuilder {
         long retentionPeriod() {
             if (builder instanceof WindowStoreBuilder) {
                 return ((WindowStoreBuilder) builder).retentionPeriod();
+            } else if (builder instanceof TimestampedWindowStoreBuilder) {
+                return ((TimestampedWindowStoreBuilder) builder).retentionPeriod();
             } else if (builder instanceof SessionStoreBuilder) {
                 return ((SessionStoreBuilder) builder).retentionPeriod();
             } else {
@@ -160,7 +163,9 @@ public class InternalTopologyBuilder {
         }
 
         private boolean isWindowStore() {
-            return builder instanceof WindowStoreBuilder || builder instanceof SessionStoreBuilder;
+            return builder instanceof WindowStoreBuilder
+                || builder instanceof TimestampedWindowStoreBuilder
+                || builder instanceof SessionStoreBuilder;
         }
 
         // Apparently Java strips the generics from this method because we're using the raw type for builder,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 36a3750..d25d16e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -32,6 +32,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -84,6 +86,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         if (global != null) {
             if (global instanceof KeyValueStore) {
                 return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
+            } else if (global instanceof TimestampedWindowStore) {
+                return new TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global);
             } else if (global instanceof WindowStore) {
                 return new WindowStoreReadOnlyDecorator((WindowStore) global);
             } else if (global instanceof SessionStore) {
@@ -106,6 +110,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         final StateStore store = stateManager.getStore(name);
         if (store instanceof KeyValueStore) {
             return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+        } else if (store instanceof TimestampedWindowStore) {
+            return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
         } else if (store instanceof WindowStore) {
             return new WindowStoreReadWriteDecorator((WindowStore) store);
         } else if (store instanceof SessionStore) {
@@ -339,6 +345,63 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedWindowStoreReadOnlyDecorator<K, V>
+        extends StateStoreReadOnlyDecorator<TimestampedWindowStore<K, V>, K, V>
+        implements TimestampedWindowStore<K, V> {
+
+        private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public void put(final K key,
+                        final ValueAndTimestamp<V> valueAndTimestamp) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final K key,
+                        final ValueAndTimestamp<V> valueAndTimestamp,
+                        final long windowStartTimestamp) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public ValueAndTimestamp<V> fetch(final K key,
+                                          final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @Deprecated
+        @Override
+        public WindowStoreIterator<ValueAndTimestamp<V>> fetch(final K key,
+                                                               final long timeFrom,
+                                                               final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetch(final K from,
+                                                                         final K to,
+                                                                         final long timeFrom,
+                                                                         final long timeTo) {
+            return wrapped().fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all() {
+            return wrapped().all();
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchAll(final long timeFrom,
+                                                                            final long timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+    }
+
     private static class SessionStoreReadOnlyDecorator<K, AGG>
         extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
@@ -520,6 +583,63 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedWindowStoreReadWriteDecorator<K, V>
+        extends StateStoreReadWriteDecorator<TimestampedWindowStore<K, V>, K, V>
+        implements TimestampedWindowStore<K, V> {
+
+        TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public void put(final K key,
+                        final ValueAndTimestamp<V> valueAndTimestamp) {
+            wrapped().put(key, valueAndTimestamp);
+        }
+
+        @Override
+        public void put(final K key,
+                        final ValueAndTimestamp<V> valueAndTimestamp,
+                        final long windowStartTimestamp) {
+            wrapped().put(key, valueAndTimestamp, windowStartTimestamp);
+        }
+
+        @Override
+        public ValueAndTimestamp<V> fetch(final K key,
+                                          final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @Deprecated
+        @Override
+        public WindowStoreIterator<ValueAndTimestamp<V>> fetch(final K key,
+                                                               final long timeFrom,
+                                                               final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetch(final K from,
+                                                                         final K to,
+                                                                         final long timeFrom,
+                                                                         final long timeTo) {
+            return wrapped().fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all() {
+            return wrapped().all();
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchAll(final long timeFrom,
+                                                                            final long timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+    }
+
     static class SessionStoreReadWriteDecorator<K, AGG>
         extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
@@ -549,12 +669,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+        public void put(final Windowed<K> sessionKey,
+                        final AGG aggregate) {
             wrapped().put(sessionKey, aggregate);
         }
 
         @Override
-        public AGG fetchSession(final K key, final long startTime, final long endTime) {
+        public AGG fetchSession(final K key,
+                                final long startTime,
+                                final long endTime) {
             return wrapped().fetchSession(key, startTime, endTime);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
index 7d52c12..7b806f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
@@ -31,4 +31,4 @@ import org.apache.kafka.streams.kstream.Windowed;
  * @param <K> Type of keys
  * @param <V> Type of values
  */
-public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { }
\ No newline at end of file
+public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 50a2c7c..0a869da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -64,9 +64,10 @@ class CachingWindowStore
         this.context = context;
         final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
 
-        bytesSerdes = new StateSerdes<>(topic,
-                                        Serdes.Bytes(),
-                                        Serdes.ByteArray());
+        bytesSerdes = new StateSerdes<>(
+            topic,
+            Serdes.Bytes(),
+            Serdes.ByteArray());
         name = context.taskId() + "-" + name();
         cache = this.context.getCache();
 
@@ -121,12 +122,15 @@ class CachingWindowStore
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value) {
+    public synchronized void put(final Bytes key,
+                                 final byte[] value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
+    public synchronized void put(final Bytes key,
+                                 final byte[] value,
+                                 final long windowStartTimestamp) {
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
         validateStoreOpen();
@@ -145,7 +149,8 @@ class CachingWindowStore
     }
 
     @Override
-    public byte[] fetch(final Bytes key, final long timestamp) {
+    public byte[] fetch(final Bytes key,
+                        final long timestamp) {
         validateStoreOpen();
         final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
         final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
@@ -162,7 +167,9 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
+    public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                                          final long timeFrom,
+                                                          final long timeTo) {
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
         validateStoreOpen();
@@ -175,10 +182,7 @@ class CachingWindowStore
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
 
-        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key,
-                                                                             key,
-                                                                             timeFrom,
-                                                                             timeTo);
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(
             cacheIterator, hasNextCondition, cacheFunction
         );
@@ -188,12 +192,16 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
         validateStoreOpen();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetch(from, to, timeFrom, timeTo);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
+            wrapped().fetch(from, to, timeFrom, timeTo);
         if (cache == null) {
             return underlyingIterator;
         }
@@ -201,10 +209,7 @@ class CachingWindowStore
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);
 
-        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from,
-                                                                             to,
-                                                                             timeFrom,
-                                                                             timeTo);
+        final HasNextCondition hasNextCondition = keySchema.hasNextCondition(from, to, timeFrom, timeTo);
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
 
         return new MergedSortedCacheWindowStoreKeyValueIterator(
@@ -218,16 +223,16 @@ class CachingWindowStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+                                                              final long timeTo) {
         validateStoreOpen();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetchAll(timeFrom, timeTo);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
 
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, timeFrom, timeTo);
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(cacheIterator,
-                                                                                                              hasNextCondition,
-                                                                                                              cacheFunction);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);
         return new MergedSortedCacheWindowStoreKeyValueIterator(
                 filteredCacheIterator,
                 underlyingIterator,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 02568b6..02e4c6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -23,6 +23,7 @@ import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial
 import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
 
 public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore {
+
     ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
         super(inner);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
similarity index 79%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
copy to streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 02568b6..94362d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -17,14 +17,16 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
 import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.timestamp;
 
-public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore {
-    ChangeLoggingTimestampedKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
-        super(inner);
+class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesStore {
+
+    ChangeLoggingTimestampedWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
+                                             final boolean retainDuplicates) {
+        super(bytesStore, retainDuplicates);
     }
 
     @Override
@@ -36,4 +38,4 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
             changeLogger.logChange(key, null);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 7f7612e..ef5a4c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -36,10 +36,11 @@ class ChangeLoggingWindowBytesStore
     implements WindowStore<Bytes, byte[]> {
 
     private final boolean retainDuplicates;
-    private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private ProcessorContext context;
     private int seqnum = 0;
 
+    StoreChangeLogger<Bytes, byte[]> changeLogger;
+
     ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
                                   final boolean retainDuplicates) {
         super(bytesStore);
@@ -47,19 +48,37 @@ class ChangeLoggingWindowBytesStore
     }
 
     @Override
-    public byte[] fetch(final Bytes key, final long timestamp) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        this.context = context;
+        super.init(context, root);
+        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
+        changeLogger = new StoreChangeLogger<>(
+            name(),
+            context,
+            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key,
+                        final long timestamp) {
         return wrapped().fetch(key, timestamp);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) {
+    public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                             final long from,
+                                             final long to) {
         return wrapped().fetch(key, from, to);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
+                                                           final Bytes keyTo,
+                                                           final long from,
+                                                           final long to) {
         return wrapped().fetch(keyFrom, keyTo, from, to);
     }
 
@@ -70,7 +89,8 @@ class ChangeLoggingWindowBytesStore
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+                                                              final long timeTo) {
         return wrapped().fetchAll(timeFrom, timeTo);
     }
 
@@ -84,20 +104,16 @@ class ChangeLoggingWindowBytesStore
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
+    public void put(final Bytes key,
+                    final byte[] value,
+                    final long windowStartTimestamp) {
         wrapped().put(key, value, windowStartTimestamp);
-        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
+        log(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value);
     }
 
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        this.context = context;
-        super.init(context, root);
-        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
-        changeLogger = new StoreChangeLogger<>(
-            name(),
-            context,
-            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+    void log(final Bytes key,
+             final byte[] value) {
+        changeLogger.logChange(key, value);
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
new file mode 100644
index 0000000..1c10491
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
+ * inner WindowStore implementation do not need to provide its own metrics collecting functionality.
+ * The inner {@link WindowStore} of this class is of type &lt;Bytes,byte[]&gt;, hence we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;
+ * @param <K>
+ * @param <V>
+ */
+class MeteredTimestampedWindowStore<K, V>
+    extends MeteredWindowStore<K, ValueAndTimestamp<V>>
+    implements TimestampedWindowStore<K, V> {
+
+    MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner,
+                                  final long windowSizeMs,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    void initStoreSerde(final ProcessorContext context) {
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index c040e67..681b210 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -43,13 +43,13 @@ public class MeteredWindowStore<K, V>
     private final long windowSizeMs;
     private final String metricScope;
     private final Time time;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
+    final Serde<K> keySerde;
+    final Serde<V> valueSerde;
+    StateSerdes<K, V> serdes;
     private StreamsMetricsImpl metrics;
     private Sensor putTime;
     private Sensor fetchTime;
     private Sensor flushTime;
-    private StateSerdes<K, V> serdes;
     private ProcessorContext context;
     private String taskName;
 
@@ -67,15 +67,11 @@ public class MeteredWindowStore<K, V>
         this.valueSerde = valueSerde;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         this.context = context;
-        serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        initStoreSerde(context);
         metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
@@ -102,6 +98,14 @@ public class MeteredWindowStore<K, V>
     }
 
     @SuppressWarnings("unchecked")
+    void initStoreSerde(final ProcessorContext context) {
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+    }
+
+    @SuppressWarnings("unchecked")
     @Override
     public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listener,
                                     final boolean sendOldValues) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
new file mode 100644
index 0000000..dcb0d44
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.Objects;
+
+public class TimestampedWindowStoreBuilder<K, V>
+    extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, TimestampedWindowStore<K, V>> {
+
+    private final WindowBytesStoreSupplier storeSupplier;
+
+    public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
+                                         final Serde<K> keySerde,
+                                         final Serde<V> valueSerde,
+                                         final Time time) {
+        super(storeSupplier.name(), keySerde, new ValueAndTimestampSerde<>(valueSerde), time);
+        Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public TimestampedWindowStore<K, V> build() {
+        return new MeteredTimestampedWindowStore<>(
+            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            storeSupplier.windowSize(),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde);
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingWindowStore(
+            inner,
+            storeSupplier.windowSize(),
+            storeSupplier.segmentIntervalMs());
+    }
+
+    private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new ChangeLoggingTimestampedWindowBytesStore(inner, storeSupplier.retainDuplicates());
+    }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index b29b04c..00a3c98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -55,22 +57,25 @@ public class ProcessorContextImplTest {
     private ProcessorContextImpl context;
 
     private static final String KEY = "key";
-    private static final long VAL = 42L;
+    private static final long VALUE = 42L;
+    private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L);
     private static final String STORE_NAME = "underlying-store";
 
     private boolean flushExecuted;
     private boolean putExecuted;
+    private boolean putWithTimestampExecuted;
     private boolean putIfAbsentExecuted;
     private boolean putAllExecuted;
     private boolean deleteExecuted;
     private boolean removeExecuted;
-    private boolean put3argExecuted;
 
     private KeyValueIterator<String, Long> rangeIter;
     private KeyValueIterator<String, Long> allIter;
 
-    private List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7);
+    private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7);
+    private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7);
     private WindowStoreIterator<Long> windowStoreIter;
+    private WindowStoreIterator<ValueAndTimestamp<Long>> timestampedWindowStoreIter;
 
     @Before
     public void setup() {
@@ -80,7 +85,6 @@ public class ProcessorContextImplTest {
         putAllExecuted = false;
         deleteExecuted = false;
         removeExecuted = false;
-        put3argExecuted = false;
 
         rangeIter = mock(KeyValueIterator.class);
         allIter = mock(KeyValueIterator.class);
@@ -88,6 +92,7 @@ public class ProcessorContextImplTest {
 
         for (int i = 0; i < 7; i++) {
             iters.add(i, mock(KeyValueIterator.class));
+            timestampedIters.add(i, mock(KeyValueIterator.class));
         }
 
         final StreamsConfig streamsConfig = mock(StreamsConfig.class);
@@ -100,11 +105,13 @@ public class ProcessorContextImplTest {
 
         expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
         expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
+        expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
         expect(stateManager.getGlobalStore(anyString())).andReturn(null);
 
         expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
         expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
+        expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
 
         replay(stateManager);
@@ -120,7 +127,11 @@ public class ProcessorContextImplTest {
         );
 
         context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
-            new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore"))));
+            new HashSet<>(asList(
+                "LocalKeyValueStore",
+                "LocalWindowStore",
+                "LocalTimestampedWindowStore",
+                "LocalSessionStore"))));
     }
 
     @Test
@@ -134,10 +145,10 @@ public class ProcessorContextImplTest {
             checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
             checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()");
 
-            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals((Long) VALUE, store.get(KEY));
             assertEquals(rangeIter, store.range("one", "two"));
             assertEquals(allIter, store.all());
-            assertEquals(VAL, store.approximateNumEntries());
+            assertEquals(VALUE, store.approximateNumEntries());
         });
     }
 
@@ -153,12 +164,29 @@ public class ProcessorContextImplTest {
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
             assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
-            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals((Long) VALUE, store.fetch(KEY, 1L));
             assertEquals(iters.get(2), store.all());
         });
     }
 
     @Test
+    public void globalTimestampedWindowStoreShouldBeReadOnly() {
+        doTest("GlobalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put()");
+
+            assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(timestampedWindowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+            assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L));
+            assertEquals(timestampedIters.get(2), store.all());
+        });
+    }
+
+    @Test
     public void globalSessionStoreShouldBeReadOnly() {
         doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -194,10 +222,10 @@ public class ProcessorContextImplTest {
             store.delete("1");
             assertTrue(deleteExecuted);
 
-            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals((Long) VALUE, store.get(KEY));
             assertEquals(rangeIter, store.range("one", "two"));
             assertEquals(allIter, store.all());
-            assertEquals(VAL, store.approximateNumEntries());
+            assertEquals(VALUE, store.approximateNumEntries());
         });
     }
 
@@ -212,18 +240,37 @@ public class ProcessorContextImplTest {
             store.put("1", 1L);
             assertTrue(putExecuted);
 
-            store.put("1", 1L, 1L);
-            assertTrue(put3argExecuted);
-
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
             assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
-            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals((Long) VALUE, store.fetch(KEY, 1L));
             assertEquals(iters.get(2), store.all());
         });
     }
 
     @Test
+    public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
+        doTest("LocalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.put("1", ValueAndTimestamp.make(1L, 1L));
+            assertTrue(putExecuted);
+
+            store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
+            assertTrue(putWithTimestampExecuted);
+
+            assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(timestampedWindowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(timestampedIters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+            assertEquals(VALUE_AND_TIMESTAMP, store.fetch(KEY, 1L));
+            assertEquals(timestampedIters.get(2), store.all());
+        });
+    }
+
+    @Test
     public void localSessionStoreShouldNotAllowInitOrClose() {
         doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -250,8 +297,8 @@ public class ProcessorContextImplTest {
 
         initStateStoreMock(keyValueStoreMock);
 
-        expect(keyValueStoreMock.get(KEY)).andReturn(VAL);
-        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL);
+        expect(keyValueStoreMock.get(KEY)).andReturn(VALUE);
+        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VALUE);
 
         expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
         expect(keyValueStoreMock.all()).andReturn(allIter);
@@ -294,7 +341,7 @@ public class ProcessorContextImplTest {
         expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0));
         expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1));
         expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter);
-        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
+        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE);
         expect(windowStore.all()).andReturn(iters.get(2));
 
         windowStore.put(anyString(), anyLong());
@@ -303,9 +350,32 @@ public class ProcessorContextImplTest {
             return null;
         });
 
-        windowStore.put(anyString(), anyLong(), anyLong());
+        replay(windowStore);
+
+        return windowStore;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() {
+        final TimestampedWindowStore<String, Long> windowStore = mock(TimestampedWindowStore.class);
+
+        initStateStoreMock(windowStore);
+
+        expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(timestampedIters.get(0));
+        expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(timestampedIters.get(1));
+        expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(timestampedWindowStoreIter);
+        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE_AND_TIMESTAMP);
+        expect(windowStore.all()).andReturn(timestampedIters.get(2));
+
+        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class));
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), anyLong());
         expectLastCall().andAnswer(() -> {
-            put3argExecuted = true;
+            putWithTimestampExecuted = true;
             return null;
         });
 
@@ -362,7 +432,6 @@ public class ProcessorContextImplTest {
                 final T store = (T) context.getStateStore(name);
 
                 checker.accept(store);
-
             }
 
             @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 66b27f0..65d9609 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -100,8 +100,8 @@ public class CachingSessionStoreTest {
         final KeyValueIterator<Windowed<Bytes>, byte[]> a = cachingStore.findSessions(keyA, 0, 0);
         final KeyValueIterator<Windowed<Bytes>, byte[]> b = cachingStore.findSessions(keyB, 0, 0);
 
-        verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1");
-        verifyWindowedKeyValue(b.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
+        verifyWindowedKeyValue(b.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes());
         assertFalse(a.hasNext());
         assertFalse(b.hasNext());
     }
@@ -115,9 +115,9 @@ public class CachingSessionStoreTest {
         assertEquals(3, cache.size());
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> all = cachingStore.findSessions(keyA, keyB, 0, 0);
-        verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1");
-        verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1");
-        verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
+        verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes());
+        verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes());
         assertFalse(all.hasNext());
     }
 
@@ -130,8 +130,8 @@ public class CachingSessionStoreTest {
         assertEquals(3, cache.size());
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> some = cachingStore.findSessions(keyAA, keyB, 0, 0);
-        verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1");
-        verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes());
+        verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes());
         assertFalse(some.hasNext());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index da49dde..0dd17b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -244,11 +244,11 @@ public class CachingWindowStoreTest {
         verifyWindowedKeyValue(
             iterator.next(),
             new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "a");
+            "a".getBytes());
         verifyWindowedKeyValue(
             iterator.next(),
             new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "b");
+            "b".getBytes());
         assertFalse(iterator.hasNext());
         assertEquals(2, cache.size());
     }
@@ -270,7 +270,7 @@ public class CachingWindowStoreTest {
             verifyWindowedKeyValue(
                 iterator.next(),
                 new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-                s);
+                s.getBytes());
         }
         assertFalse(iterator.hasNext());
     }
@@ -290,7 +290,7 @@ public class CachingWindowStoreTest {
             verifyWindowedKeyValue(
                 iterator.next(),
                 new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
-                str);
+                str.getBytes());
         }
         assertFalse(iterator.hasNext());
 
@@ -301,7 +301,7 @@ public class CachingWindowStoreTest {
             verifyWindowedKeyValue(
                 iterator1.next(),
                 new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
-                str);
+                str.getBytes());
         }
         assertFalse(iterator1.hasNext());
 
@@ -312,7 +312,7 @@ public class CachingWindowStoreTest {
             verifyWindowedKeyValue(
                 iterator2.next(),
                 new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
-                str);
+                str.getBytes());
         }
         assertFalse(iterator2.hasNext());
     }
@@ -455,11 +455,11 @@ public class CachingWindowStoreTest {
         verifyWindowedKeyValue(
             fetchRange.next(),
             new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
-            "a");
+            "a".getBytes());
         verifyWindowedKeyValue(
             fetchRange.next(),
             new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
-            "b");
+            "b".getBytes());
         assertFalse(fetchRange.hasNext());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
similarity index 63%
copy from streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
copy to streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index a36b101..edf210e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.easymock.EasyMock;
@@ -38,12 +38,13 @@ import java.util.Map;
 
 import static java.time.Instant.ofEpochMilli;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(EasyMockRunner.class)
-public class ChangeLoggingWindowBytesStoreTest {
+public class ChangeLoggingTimestampedWindowBytesStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
-    private final Map<Object, Object> sent = new HashMap<>();
+    private final Map<Object, ValueAndTimestamp<Object>> sent = new HashMap<>();
     private final NoOpRecordCollector collector = new NoOpRecordCollector() {
         @Override
         public <K, V> void send(final String topic,
@@ -54,23 +55,24 @@ public class ChangeLoggingWindowBytesStoreTest {
                                 final Long timestamp,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valueSerializer) {
-            sent.put(key, value);
+            sent.put(key, ValueAndTimestamp.make(value, timestamp));
         }
     };
 
-    private final byte[] value1 = {0};
-    private final Bytes bytesKey = Bytes.wrap(value1);
+    private final byte[] value = {0};
+    private final byte[] valueAndTimestamp = {0, 0, 0, 0, 0, 0, 0, 42, 0};
+    private final Bytes bytesKey = Bytes.wrap(value);
 
     @Mock(type = MockType.NICE)
     private WindowStore<Bytes, byte[]> inner;
     @Mock(type = MockType.NICE)
     private ProcessorContextImpl context;
-    private ChangeLoggingWindowBytesStore store;
+    private ChangeLoggingTimestampedWindowBytesStore store;
 
 
     @Before
     public void setUp() {
-        store = new ChangeLoggingWindowBytesStore(inner, false);
+        store = new ChangeLoggingTimestampedWindowBytesStore(inner, false);
     }
 
     private void init() {
@@ -85,20 +87,27 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldLogPuts() {
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, valueAndTimestamp, 0);
         EasyMock.expectLastCall();
 
         init();
 
-        store.put(bytesKey, value1);
+        store.put(bytesKey, valueAndTimestamp);
 
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)).timestamp());
         EasyMock.verify(inner);
     }
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, 0, 10))
+            .andReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         init();
 
@@ -108,7 +117,9 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, bytesKey, 0, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
 
         init();
 
@@ -118,16 +129,26 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldRetainDuplicatesWhenSet() {
-        store = new ChangeLoggingWindowBytesStore(inner, true);
-        inner.put(bytesKey, value1, 0);
+        store = new ChangeLoggingTimestampedWindowBytesStore(inner, true);
+        inner.put(bytesKey, valueAndTimestamp, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value1);
-        store.put(bytesKey, value1);
-
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
+        store.put(bytesKey, valueAndTimestamp);
+        store.put(bytesKey, valueAndTimestamp);
+
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)).timestamp());
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)).value());
+        assertEquals(
+            42L,
+            sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)).timestamp());
 
         EasyMock.verify(inner);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index a36b101..d7ad6d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.state.WindowStore;
@@ -58,8 +57,8 @@ public class ChangeLoggingWindowBytesStoreTest {
         }
     };
 
-    private final byte[] value1 = {0};
-    private final Bytes bytesKey = Bytes.wrap(value1);
+    private final byte[] value = {0};
+    private final Bytes bytesKey = Bytes.wrap(value);
 
     @Mock(type = MockType.NICE)
     private WindowStore<Bytes, byte[]> inner;
@@ -85,20 +84,24 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldLogPuts() {
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall();
 
         init();
 
-        store.put(bytesKey, value1);
+        store.put(bytesKey, value);
 
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
+        assertArrayEquals(
+            value,
+            (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0)));
         EasyMock.verify(inner);
     }
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, 0, 10))
+            .andReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         init();
 
@@ -108,7 +111,9 @@ public class ChangeLoggingWindowBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        EasyMock
+            .expect(inner.fetch(bytesKey, bytesKey, 0, 1))
+            .andReturn(KeyValueIterators.emptyIterator());
 
         init();
 
@@ -119,15 +124,15 @@ public class ChangeLoggingWindowBytesStoreTest {
     @Test
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
-        inner.put(bytesKey, value1, 0);
+        inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value1);
-        store.put(bytesKey, value1);
+        store.put(bytesKey, value);
+        store.put(bytesKey, value);
 
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
-        assertArrayEquals(value1, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
+        assertArrayEquals(value, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1)));
+        assertArrayEquals(value, (byte[]) sent.get(WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2)));
 
         EasyMock.verify(inner);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
new file mode 100644
index 0000000..a3522f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNull;
+
+public class MeteredTimestampWindowStoreTest {
+    private InternalMockProcessorContext context;
+    @SuppressWarnings("unchecked")
+    private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
+    private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
+        innerStoreMock,
+        10L, // any size
+        "scope",
+        new MockTime(),
+        Serdes.String(),
+        new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+    );
+    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
+
+    {
+        EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
+    }
+
+    @Before
+    public void setUp() {
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+
+        context = new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+    }
+
+    @Test
+    public void shouldCloseUnderlyingStore() {
+        innerStoreMock.close();
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerStoreMock);
+
+        store.init(context, store);
+        store.close();
+        EasyMock.verify(innerStoreMock);
+    }
+
+    @Test
+    public void shouldNotExceptionIfFetchReturnsNull() {
+        EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
+        EasyMock.replay(innerStoreMock);
+
+        store.init(context, store);
+        assertNull(store.fetch("a", 0));
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 87a693d..e37affd 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -97,7 +97,8 @@ public final class StreamsTestUtils {
         return results;
     }
 
-    public static <K> void verifyKeyValueList(final List<KeyValue<K, byte[]>> expected, final List<KeyValue<K, byte[]>> actual) {
+    public static <K> void verifyKeyValueList(final List<KeyValue<K, byte[]>> expected,
+                                              final List<KeyValue<K, byte[]>> actual) {
         assertThat(actual.size(), equalTo(expected.size()));
         for (int i = 0; i < actual.size(); i++) {
             final KeyValue<K, byte[]> expectedKv = expected.get(i);
@@ -109,10 +110,10 @@ public final class StreamsTestUtils {
 
     public static void verifyWindowedKeyValue(final KeyValue<Windowed<Bytes>, byte[]> actual,
                                               final Windowed<Bytes> expectedKey,
-                                              final String expectedValue) {
+                                              final byte[] expectedValue) {
         assertThat(actual.key.window(), equalTo(expectedKey.window()));
         assertThat(actual.key.key(), equalTo(expectedKey.key()));
-        assertThat(actual.value, equalTo(expectedValue.getBytes()));
+        assertThat(actual.value, equalTo(expectedValue));
     }
 
     public static Metric getMetricByName(final Map<MetricName, ? extends Metric> metrics,


Mime
View raw message