kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFAK-3522: add API to create timestamped stores (#6601)
Date Wed, 01 May 2019 07:28:28 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5665e6  KAFAK-3522: add API to create timestamped stores (#6601)
c5665e6 is described below

commit c5665e6945c8e63ddfb1056c4893f16cae1f6f99
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Wed May 1 09:28:10 2019 +0200

    KAFAK-3522: add API to create timestamped stores (#6601)
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../org/apache/kafka/streams/state/Stores.java     | 261 ++++++++++++++++-----
 .../kafka/streams/state/TimestampedBytesStore.java |   3 +
 ...ValueToTimestampedKeyValueByteStoreAdapter.java | 131 +++++++++++
 ...yValueToTimestampedKeyValueIteratorAdapter.java |  58 +++++
 .../state/internals/RocksDBTimestampedStore.java   |  13 +-
 ...ils.java => RocksDBTimestampedWindowStore.java} |  19 +-
 .../internals/RocksDbWindowBytesStoreSupplier.java |  37 ++-
 .../internals/TimestampedKeyValueStoreBuilder.java |   7 +-
 .../internals/TimestampedWindowStoreBuilder.java   |   7 +-
 .../WindowToTimestampedWindowByteStoreAdapter.java | 152 ++++++++++++
 .../org/apache/kafka/streams/state/StoresTest.java | 142 ++++++++---
 .../TimestampedKeyValueStoreBuilderTest.java       |  38 ++-
 .../TimestampedWindowStoreBuilderTest.java         | 183 +++++++++++++++
 .../state/internals/WindowStoreBuilderTest.java    |  34 +--
 14 files changed, 929 insertions(+), 156 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c85fe03..e40251d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplie
 import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
 import java.time.Duration;
@@ -75,13 +77,18 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
  * }</pre>
  */
 @InterfaceStability.Evolving
-public class Stores {
+public final class Stores {
 
     /**
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+     * If you want to create a {@link TimestampedKeyValueStore} you should use
+     * {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead.
+     *
      * @param name  name of the store (cannot be {@code null})
      * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
-     * to build a persistent store
+     * to build a persistent key-value store
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
         Objects.requireNonNull(name, "name cannot be null");
@@ -89,7 +96,28 @@ public class Stores {
     }
 
     /**
+     * Create a persistent {@link KeyValueBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+     * If you want to create a {@link KeyValueStore} you should use
+     * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
+     *
+     * @param name  name of the store (cannot be {@code null})
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+     * to build a persistent key-(timestamp/value) store
+     */
+    public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
+        return new RocksDbKeyValueBytesStoreSupplier(name, true);
+    }
+
+    /**
      * Create an in-memory {@link KeyValueBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
+     * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+     *
      * @param name  name of the store (cannot be {@code null})
      * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
      * build an in-memory store
@@ -116,6 +144,10 @@ public class Stores {
 
     /**
      * Create a LRU Map {@link KeyValueBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
+     * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
+     *
      * @param name          name of the store (cannot be {@code null})
      * @param maxCacheSize  maximum number of items in the LRU (cannot be negative)
      * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
@@ -145,49 +177,13 @@ public class Stores {
     }
 
     /**
-     * Create an in-memory {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
-     *                              Note that the retention period must be at least long enough to contain the
-     *                              windowed data's entire life cycle, from window-start through window-end,
-     *                              and for the entire grace period.
-     * @param windowSize            size of the windows (cannot be negative)
-     * @return an instance of {@link WindowBytesStoreSupplier}
-     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
-     */
-    public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
-                                                               final Duration retentionPeriod,
-                                                               final Duration windowSize,
-                                                               final boolean retainDuplicates) throws IllegalArgumentException {
-        Objects.requireNonNull(name, "name cannot be null");
-        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
-        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
-        final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
-
-        Objects.requireNonNull(name, "name cannot be null");
-        if (retentionMs < 0L) {
-            throw new IllegalArgumentException("retentionPeriod cannot be negative");
-        }
-        if (windowSizeMs < 0L) {
-            throw new IllegalArgumentException("windowSize cannot be negative");
-        }
-        if (windowSizeMs > retentionMs) {
-            throw new IllegalArgumentException("The retention period of the window store "
-                + name + " must be no smaller than its window size. Got size=["
-                + windowSize + "], retention=[" + retentionPeriod + "]");
-        }
-
-        return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
-    }
-
-    /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
+     *
      * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store (cannot be negative).
-     *                              Note that the retention period must be at least long enough to contain the
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              (note that the retention period must be at least long enough to contain the
      *                              windowed data's entire life cycle, from window-start through window-end,
-     *                              and for the entire grace period.
+     *                              and for the entire grace period)
      * @param numSegments           number of db segments (cannot be zero or negative)
      * @param windowSize            size of the windows that are stored (cannot be negative). Note: the window size
      *                              is not stored with the records, so this value is used to compute the keys that
@@ -214,17 +210,23 @@ public class Stores {
             retentionPeriod,
             windowSize,
             retainDuplicates,
-            legacySegmentInterval
+            legacySegmentInterval,
+            false
         );
     }
 
     /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+     * If you want to create a {@link TimestampedWindowStore} you should use
+     * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
+     *
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
-     *                              Note that the retention period must be at least long enough to contain the
+     *                              (note that the retention period must be at least long enough to contain the
      *                              windowed data's entire life cycle, from window-start through window-end,
-     *                              and for the entire grace period.
+     *                              and for the entire grace period)
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
@@ -234,6 +236,39 @@ public class Stores {
                                                                  final Duration retentionPeriod,
                                                                  final Duration windowSize,
                                                                  final boolean retainDuplicates) throws IllegalArgumentException {
+        return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false);
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+     * If you want to create a {@link WindowStore} you should use
+     * {@link #persistentWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
+     *
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              (note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
+     */
+    public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
+                                                                            final Duration retentionPeriod,
+                                                                            final Duration windowSize,
+                                                                            final boolean retainDuplicates) throws IllegalArgumentException {
+        return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true);
+    }
+
+    private static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                  final Duration retentionPeriod,
+                                                                  final Duration windowSize,
+                                                                  final boolean retainDuplicates,
+                                                                  final boolean timestampedStore) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
@@ -242,14 +277,15 @@ public class Stores {
 
         final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
 
-        return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval);
+        return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore);
     }
 
     private static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                                   final long retentionPeriod,
                                                                   final long windowSize,
                                                                   final boolean retainDuplicates,
-                                                                  final long segmentInterval) {
+                                                                  final long segmentInterval,
+                                                                  final boolean timestampedStore) {
         Objects.requireNonNull(name, "name cannot be null");
         if (retentionPeriod < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
@@ -262,8 +298,8 @@ public class Stores {
         }
         if (windowSize > retentionPeriod) {
             throw new IllegalArgumentException("The retention period of the window store "
-                                                   + name + " must be no smaller than its window size. Got size=["
-                                                   + windowSize + "], retention=[" + retentionPeriod + "]");
+                + name + " must be no smaller than its window size. Got size=["
+                + windowSize + "], retention=[" + retentionPeriod + "]");
         }
 
         return new RocksDbWindowBytesStoreSupplier(
@@ -272,7 +308,49 @@ public class Stores {
             segmentInterval,
             windowSize,
             retainDuplicates,
-            false);
+            timestampedStore);
+    }
+
+    /**
+     * Create an in-memory {@link WindowBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)} or
+     * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
+     *
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              Note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period.
+     * @param windowSize            size of the windows (cannot be negative)
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
+     */
+    public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
+                                                               final Duration retentionPeriod,
+                                                               final Duration windowSize,
+                                                               final boolean retainDuplicates) throws IllegalArgumentException {
+        Objects.requireNonNull(name, "name cannot be null");
+
+        final String repartitionPeriodErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, repartitionPeriodErrorMessagePrefix);
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+
+        final String windowSizeErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix);
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
+
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the window store "
+                + name + " must be no smaller than its window size. Got size=["
+                + windowSize + "], retention=[" + retentionPeriod + "]");
+        }
+
+        return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
     }
 
     /**
@@ -297,11 +375,12 @@ public class Stores {
 
     /**
      * Create a persistent {@link SessionBytesStoreSupplier}.
+     *
      * @param name              name of the store (cannot be {@code null})
      * @param retentionPeriodMs length ot time to retain data in the store (cannot be negative)
-     *                          Note that the retention period must be at least long enough to contain the
+     *                          (note that the retention period must be at least long enough to contain the
      *                          windowed data's entire life cycle, from window-start through window-end,
-     *                          and for the entire grace period.
+     *                          and for the entire grace period)
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
      */
@@ -317,6 +396,7 @@ public class Stores {
 
     /**
      * Create a persistent {@link SessionBytesStoreSupplier}.
+     *
      * @param name              name of the store (cannot be {@code null})
      * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
      *                          Note that the retention period must be at least long enough to contain the
@@ -331,12 +411,58 @@ public class Stores {
         return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
     }
 
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}.
+     * <p>
+     * The provided supplier should <strong>not</strong> be a supplier for
+     * {@link TimestampedKeyValueStore TimestampedKeyValueStores}.
+     *
+     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is {@code null} for put operations,
+     *                      it is treated as delete
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+     */
+    public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+                                                                                final Serde<K> keySerde,
+                                                                                final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+    }
+
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStore}.
+     * <p>
+     * The provided supplier should <strong>not</strong> be a supplier for
+     * {@link KeyValueStore KeyValueStores}. For this case, passed in timestamps will be dropped and not stored in the
+     * key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.
+     *
+     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is {@code null} for put operations,
+     *                      it is treated as delete
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+     */
+    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+                                                                                                      final Serde<K> keySerde,
+                                                                                                      final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+    }
 
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
+     * <p>
+     * The provided supplier should <strong>not</strong> be a supplier for
+     * {@link TimestampedWindowStore TimestampedWindowStores}.
+     *
      * @param supplier      a {@link WindowBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     * @param valueSerde    the value serde to use; if the serialized bytes is {@code null} for put operations,
      *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
@@ -350,37 +476,42 @@ public class Stores {
     }
 
     /**
-     * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
-     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
+     * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedWindowStore}.
+     * <p>
+     * The provided supplier should <strong>not</strong> be a supplier for
+     * {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the
+     * windows-store. On read, no valid timestamp but a dummy timestamp will be returned.
+     *
+     * @param supplier      a {@link WindowBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     * @param valueSerde    the value serde to use; if the serialized bytes is {@code null} for put operations,
      *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
-     * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+     * @return an instance of {@link StoreBuilder} that can build a {@link TimestampedWindowStore}
      */
-    public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
-                                                                                final Serde<K> keySerde,
-                                                                                final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
+                                                                                                  final Serde<K> keySerde,
+                                                                                                  final Serde<V> valueSerde) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
-        return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+        return new TimestampedWindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
 
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
+     *
      * @param supplier      a {@link SessionBytesStoreSupplier} (cannot be {@code null})
      * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use; if the serialized bytes is null for put operations,
+     * @param valueSerde    the value serde to use; if the serialized bytes is {@code null} for put operations,
      *                      it is treated as delete
      * @param <K>           key type
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
-     * */
+     */
     public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                               final Serde<K> keySerde,
                                                                               final Serde<V> valueSerde) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
     }
-}
-
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
index 5b5fbc5..e609b70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
@@ -22,6 +22,9 @@ import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
 
 public interface TimestampedBytesStore {
     static byte[] convertToTimestampedFormat(final byte[] plainValue) {
+        if (plainValue == null) {
+            return null;
+        }
         return ByteBuffer
             .allocate(8 + plainValue.length)
             .putLong(NO_TIMESTAMP)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
new file mode 100644
index 0000000..62cfac3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStores} via
+ * {@link org.apache.kafka.streams.kstream.Materialized#as(KeyValueBytesStoreSupplier)} this adapter is used to
+ * translate between old a new {@code byte[]} format of the value.
+ *
+ * @see KeyValueToTimestampedKeyValueIteratorAdapter
+ */
+public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestamp) {
+        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestamp) {
+        return convertToTimestampedFormat(store.putIfAbsent(
+            key,
+            valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestamp = entry.value;
+            store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToTimestampedFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void flush() {
+        store.flush();
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertToTimestampedFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.range(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java
new file mode 100644
index 0000000..7bdcb5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and
+ * {@link org.apache.kafka.streams.state.KeyValueStore}.
+ *
+ * @see KeyValueToTimestampedKeyValueByteStoreAdapter
+ */
+class KeyValueToTimestampedKeyValueIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
+    private final KeyValueIterator<K, byte[]> innerIterator;
+
+    KeyValueToTimestampedKeyValueIteratorAdapter(final KeyValueIterator<K, byte[]> innerIterator) {
+        this.innerIterator = innerIterator;
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+
+    @Override
+    public K peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, byte[]> next() {
+        final KeyValue<K, byte[]> plainKeyValue = innerIterator.next();
+        return KeyValue.pair(plainKeyValue.key, convertToTimestampedFormat(plainKeyValue.value));
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index f52033b..5466ce8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -42,12 +43,12 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 
 import static java.util.Arrays.asList;
-import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp;
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
 
 /**
  * A persistent key-(value-timestamp) store based on RocksDB.
  */
-public class RocksDBTimestampedStore extends RocksDBStore {
+public class RocksDBTimestampedStore extends RocksDBStore implements TimestampedBytesStore {
     private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class);
 
     RocksDBTimestampedStore(final String name) {
@@ -160,7 +161,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
 
             final byte[] plainValue = db.get(oldColumnFamily, key);
             if (plainValue != null) {
-                final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue);
+                final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
                 // this does only work, because the changelog topic contains correct data already
                 // for other format changes, we cannot take this short cut and can only migrate data
                 // from old to new store on put()
@@ -180,7 +181,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
 
             final byte[] plainValue = db.get(oldColumnFamily, key);
             if (plainValue != null) {
-                return getValueWithUnknownTimestamp(plainValue);
+                return convertToTimestampedFormat(plainValue);
             }
 
             return null;
@@ -319,12 +320,12 @@ public class RocksDBTimestampedStore extends RocksDBStore {
                 }
             } else {
                 if (nextWithTimestamp == null) {
-                    next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
+                    next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
                     nextNoTimestamp = null;
                     iterNoTimestamp.next();
                 } else {
                     if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) {
-                        next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
+                        next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
                         nextNoTimestamp = null;
                         iterNoTimestamp.next();
                     } else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
similarity index 67%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java
rename to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
index e78b382..b96748e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java
@@ -16,21 +16,14 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import java.nio.ByteBuffer;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 
-import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
+class RocksDBTimestampedWindowStore extends RocksDBWindowStore implements TimestampedBytesStore {
 
-class StoreProxyUtils {
-
-    static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) {
-        if (rawValue == null) {
-            return null;
-        }
-        return ByteBuffer
-            .allocate(8 + rawValue.length)
-            .putLong(NO_TIMESTAMP)
-            .put(rawValue)
-            .array();
+    RocksDBTimestampedWindowStore(final SegmentedBytesStore bytesStore,
+                                  final boolean retainDuplicates,
+                                  final long windowSize) {
+        super(bytesStore, retainDuplicates, windowSize);
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index b2e8c11..79f1ee3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -49,28 +49,27 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
 
     @Override
     public WindowStore<Bytes, byte[]> get() {
-        final SegmentedBytesStore segmentedBytesStore;
         if (!returnTimestampedStore) {
-            segmentedBytesStore = new RocksDBSegmentedBytesStore(
-                name,
-                metricsScope(),
-                retentionPeriod,
-                segmentInterval,
-                new WindowKeySchema()
-            );
+            return new RocksDBWindowStore(
+                new RocksDBSegmentedBytesStore(
+                    name,
+                    metricsScope(),
+                    retentionPeriod,
+                    segmentInterval,
+                    new WindowKeySchema()),
+                retainDuplicates,
+                windowSize);
         } else {
-            segmentedBytesStore = new RocksDBTimestampedSegmentedBytesStore(
-                name,
-                metricsScope(),
-                retentionPeriod,
-                segmentInterval,
-                new WindowKeySchema()
-            );
+            return new RocksDBTimestampedWindowStore(
+                new RocksDBTimestampedSegmentedBytesStore(
+                    name,
+                    metricsScope(),
+                    retentionPeriod,
+                    segmentInterval,
+                    new WindowKeySchema()),
+                retainDuplicates,
+                windowSize);
         }
-        return new RocksDBWindowStore(
-            segmentedBytesStore,
-            retainDuplicates,
-            windowSize);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index 5a0bf22..f43e4e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -46,8 +47,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
 
     @Override
     public TimestampedKeyValueStore<K, V> build() {
+        KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
+        if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
+            store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+        }
         return new MeteredTimestampedKeyValueStore<>(
-            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            maybeWrapCaching(maybeWrapLogging(store)),
             storeSupplier.metricsScope(),
             time,
             keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index dcb0d44..2c7c950 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -42,8 +43,12 @@ public class TimestampedWindowStoreBuilder<K, V>
 
     @Override
     public TimestampedWindowStore<K, V> build() {
+        WindowStore<Bytes, byte[]> store = storeSupplier.get();
+        if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
+            store = new WindowToTimestampedWindowByteStoreAdapter(store);
+        }
         return new MeteredTimestampedWindowStore<>(
-            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            maybeWrapCaching(maybeWrapLogging(store)),
             storeSupplier.windowSize(),
             storeSupplier.metricsScope(),
             time,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
new file mode 100644
index 0000000..7bd8665
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
+import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
+
+class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, byte[]> {
+    final WindowStore<Bytes, byte[]> store;
+
+    WindowToTimestampedWindowByteStoreAdapter(final WindowStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestamp) {
+        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestamp,
+                    final long windowStartTimestamp) {
+        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp);
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key,
+                        final long time) {
+        return convertToTimestampedFormat(store.fetch(key, time));
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                             final long timeFrom,
+                                             final long timeTo) {
+        return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                             final Instant from,
+                                             final Instant to) {
+        return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, from, to));
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final Instant fromTime,
+                                                           final Instant toTime) {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, fromTime, toTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
+                                                              final long timeTo) {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from,
+                                                              final Instant to) {
+        return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(from, to));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void flush() {
+        store.flush();
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+
+    private static class WindowToTimestampedWindowIteratorAdapter
+        extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
+        implements WindowStoreIterator<byte[]> {
+
+        WindowToTimestampedWindowIteratorAdapter(final KeyValueIterator<Long, byte[]> innerIterator) {
+            super(innerIterator);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 4819ac1..e520df4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -36,75 +36,112 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class StoresTest {
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {
-        Stores.persistentKeyValueStore(null);
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentKeyValueStore(null));
+        assertEquals("name cannot be null", e.getMessage());
+    }
+
+    @Test
+    public void shouldThrowIfPersistentTimestampedKeyValueStoreStoreNameIsNull() {
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedKeyValueStore(null));
+        assertEquals("name cannot be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
-        //noinspection ResultOfMethodCallIgnored
-        Stores.inMemoryKeyValueStore(null);
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.inMemoryKeyValueStore(null));
+        assertEquals("name cannot be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfILruMapStoreNameIsNull() {
-        Stores.lruMap(null, 0);
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.lruMap(null, 0));
+        assertEquals("name cannot be null", e.getMessage());
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowIfILruMapStoreCapacityIsNegative() {
-        Stores.lruMap("anyName", -1);
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.lruMap("anyName", -1));
+        assertEquals("maxCacheSize cannot be negative", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
-        Stores.persistentWindowStore(null, ZERO, ZERO, false);
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentWindowStore(null, ZERO, ZERO, false));
+        assertEquals("name cannot be null", e.getMessage());
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
+    public void shouldThrowIfIPersistentTimestampedWindowStoreStoreNameIsNull() {
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedWindowStore(null, ZERO, ZERO, false));
+        assertEquals("name cannot be null", e.getMessage());
+    }
+
+    @Test
     public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
-        Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false);
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false));
+        assertEquals("retentionPeriod cannot be negative", e.getMessage());
+    }
+
+    @Test
+    public void shouldThrowIfIPersistentTimestampedWindowStoreRetentionPeriodIsNegative() {
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(-1L), ZERO, false));
+        assertEquals("retentionPeriod cannot be negative", e.getMessage());
     }
 
     @Deprecated
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
-        Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", 0L, 1, 0L, false));
+        assertEquals("numSegments cannot be smaller than 2", e.getMessage());
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
-        Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false);
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
+        assertEquals("windowSize cannot be negative", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
-    public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
-        Stores.persistentSessionStore(null, ofMillis(0));
+    @Test
+    public void shouldThrowIfIPersistentTimestampedWindowStoreIfWindowSizeIsNegative() {
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
+        assertEquals("windowSize cannot be negative", e.getMessage());
+    }
 
+    @Test
+    public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentSessionStore(null, ofMillis(0)));
+        assertEquals("name cannot be null", e.getMessage());
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() {
-        Stores.persistentSessionStore("anyName", ofMillis(-1));
+        final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentSessionStore("anyName", ofMillis(-1)));
+        assertEquals("retentionPeriod cannot be negative", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() {
-        Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+        assertEquals("supplier cannot be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() {
-        Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+        assertEquals("supplier cannot be null", e.getMessage());
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test
     public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
-        Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
+        final Exception e = assertThrows(NullPointerException.class, () -> Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
+        assertEquals("supplier cannot be null", e.getMessage());
     }
 
     @Test
@@ -125,6 +162,11 @@ public class StoresTest {
     }
 
     @Test
+    public void shouldCreateRocksDbTimestampedStore() {
+        assertThat(Stores.persistentTimestampedKeyValueStore("store").get(), instanceOf(RocksDBTimestampedStore.class));
+    }
+
+    @Test
     public void shouldCreateRocksDbWindowStore() {
         final WindowStore store = Stores.persistentWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
@@ -133,11 +175,49 @@ public class StoresTest {
     }
 
     @Test
+    public void shouldCreateRocksDbTimestampedWindowStore() {
+        final WindowStore store = Stores.persistentTimestampedWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertThat(store, instanceOf(RocksDBWindowStore.class));
+        assertThat(wrapped, instanceOf(RocksDBTimestampedSegmentedBytesStore.class));
+    }
+
+    @Test
     public void shouldCreateRocksDbSessionStore() {
         assertThat(Stores.persistentSessionStore("store", ofMillis(1)).get(), instanceOf(RocksDBSessionStore.class));
     }
 
     @Test
+    public void shouldBuildKeyValueStore() {
+        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore("name"),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildTimestampedKeyValueStore() {
+        final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+            Stores.persistentTimestampedKeyValueStore("name"),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
+    public void shouldBuildTimestampedKeyValueStoreThatWrapsKeyValueStore() {
+        final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
+            Stores.persistentKeyValueStore("name"),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
+        assertThat(store, not(nullValue()));
+    }
+
+    @Test
     public void shouldBuildWindowStore() {
         final WindowStore<String, String> store = Stores.windowStoreBuilder(
             Stores.persistentWindowStore("store", ofMillis(3L), ofMillis(3L), true),
@@ -148,9 +228,9 @@ public class StoresTest {
     }
 
     @Test
-    public void shouldBuildKeyValueStore() {
-        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
-            Stores.persistentKeyValueStore("name"),
+    public void shouldBuildTimestampedWindowStore() {
+        final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
+            Stores.persistentTimestampedWindowStore("store", ofMillis(3L), ofMillis(3L), true),
             Serdes.String(),
             Serdes.String()
         ).build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
index e6dbc66..7b0eb6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -35,6 +34,9 @@ import org.junit.runner.RunWith;
 
 import java.util.Collections;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
@@ -49,9 +51,9 @@ public class TimestampedKeyValueStoreBuilderTest {
 
     @Before
     public void setUp() {
-        EasyMock.expect(supplier.get()).andReturn(inner);
-        EasyMock.expect(supplier.name()).andReturn("name");
-        EasyMock.replay(supplier);
+        expect(supplier.get()).andReturn(inner);
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
         builder = new TimestampedKeyValueStoreBuilder<>(
             supplier,
             Serdes.String(),
@@ -114,6 +116,34 @@ public class TimestampedKeyValueStoreBuilderTest {
         assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
     }
 
+    @Test
+    public void shouldNotWrapTimestampedByteStore() {
+        reset(supplier);
+        expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name"));
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        final TimestampedKeyValueStore<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedStore.class));
+    }
+
+    @Test
+    public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
+        reset(supplier);
+        expect(supplier.get()).andReturn(new RocksDBStore("name"));
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        final TimestampedKeyValueStore<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(KeyValueToTimestampedKeyValueByteStoreAdapter.class));
+    }
+
     @SuppressWarnings("all")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerIfInnerIsNull() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
new file mode 100644
index 0000000..7be31ea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class TimestampedWindowStoreBuilderTest {
+
+    @Mock(type = MockType.NICE)
+    private WindowBytesStoreSupplier supplier;
+    @Mock(type = MockType.NICE)
+    private WindowStore<Bytes, byte[]> inner;
+    private TimestampedWindowStoreBuilder<String, String> builder;
+
+    @Before
+    public void setUp() {
+        expect(supplier.get()).andReturn(inner);
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        builder = new TimestampedWindowStoreBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        final TimestampedWindowStore<String, String> store = builder.build();
+        assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        final TimestampedWindowStore<String, String> store = builder.build();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        assertThat(next, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        final TimestampedWindowStore<String, String> store = builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        assertThat(next, CoreMatchers.equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        final TimestampedWindowStore<String, String> store = builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+        assertThat(wrapped, instanceOf(CachingWindowStore.class));
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        final TimestampedWindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+        assertThat(wrapped, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+        assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        final TimestampedWindowStore<String, String> store = builder
+                .withLoggingEnabled(Collections.emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped();
+        assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
+        assertThat(caching, instanceOf(CachingWindowStore.class));
+        assertThat(changeLogging, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
+        assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
+    }
+
+    @Test
+    public void shouldNotWrapTimestampedByteStore() {
+        reset(supplier);
+        expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore(
+            new RocksDBTimestampedSegmentedBytesStore(
+                "name",
+                "metric-scope",
+                10L,
+                5L,
+                new WindowKeySchema()),
+            false,
+            1L));
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        final TimestampedWindowStore<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedWindowStore.class));
+    }
+
+    @Test
+    public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
+        reset(supplier);
+        expect(supplier.get()).andReturn(new RocksDBWindowStore(
+            new RocksDBSegmentedBytesStore(
+                "name",
+                "metric-scope",
+                10L,
+                5L,
+                new WindowKeySchema()),
+            false,
+            1L));
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        final TimestampedWindowStore<String, String> store = builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class));
+    }
+
+    @SuppressWarnings("all")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        new TimestampedWindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfKeySerdeIsNull() {
+        new TimestampedWindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfValueSerdeIsNull() {
+        new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+    }
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
index 022f6dd..bf29d4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
-import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -34,6 +33,8 @@ import org.junit.runner.RunWith;
 
 import java.util.Collections;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
@@ -47,16 +48,16 @@ public class WindowStoreBuilderTest {
     private WindowStoreBuilder<String, String> builder;
 
     @Before
-    public void setUp() throws Exception {
-        EasyMock.expect(supplier.get()).andReturn(inner);
-        EasyMock.expect(supplier.name()).andReturn("name");
-        EasyMock.replay(supplier);
-
-        builder = new WindowStoreBuilder<>(supplier,
-                                           Serdes.String(),
-                                           Serdes.String(),
-                                           new MockTime());
-
+    public void setUp() {
+        expect(supplier.get()).andReturn(inner);
+        expect(supplier.name()).andReturn("name");
+        replay(supplier);
+
+        builder = new WindowStoreBuilder<>(
+            supplier,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
     }
 
     @Test
@@ -76,7 +77,7 @@ public class WindowStoreBuilderTest {
     public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
         final WindowStore<String, String> store = builder.withLoggingDisabled().build();
         final StateStore next = ((WrappedStateStore) store).wrapped();
-        assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(next, CoreMatchers.equalTo(inner));
     }
 
     @Test
@@ -90,18 +91,18 @@ public class WindowStoreBuilderTest {
     @Test
     public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
         final WindowStore<String, String> store = builder
-                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withLoggingEnabled(Collections.emptyMap())
                 .build();
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertThat(store, instanceOf(MeteredWindowStore.class));
         assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
-        assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
     }
 
     @Test
     public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
         final WindowStore<String, String> store = builder
-                .withLoggingEnabled(Collections.<String, String>emptyMap())
+                .withLoggingEnabled(Collections.emptyMap())
                 .withCachingEnabled()
                 .build();
         final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
@@ -109,9 +110,10 @@ public class WindowStoreBuilderTest {
         assertThat(store, instanceOf(MeteredWindowStore.class));
         assertThat(caching, instanceOf(CachingWindowStore.class));
         assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
-        assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner));
+        assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
     }
 
+    @SuppressWarnings("all")
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerIfInnerIsNull() {
         new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());


Mime
View raw message