kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-7245: Deprecate WindowStore#put(key, value) (#7105)
Date Mon, 07 Oct 2019 21:51:55 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new bd9a177  KAFKA-7245: Deprecate WindowStore#put(key, value) (#7105)
bd9a177 is described below

commit bd9a1772d5364bc0f07c3d8e018e38029070dea9
Author: Omkar Mestry <30625612+omanges@users.noreply.github.com>
AuthorDate: Tue Oct 8 03:20:46 2019 +0530

    KAFKA-7245: Deprecate WindowStore#put(key, value) (#7105)
    
    Implements KIP-474.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java        |  2 ++
 .../java/org/apache/kafka/streams/state/WindowStore.java |  6 ++++++
 .../streams/state/internals/CachingWindowStore.java      |  1 +
 .../state/internals/ChangeLoggingWindowBytesStore.java   |  1 +
 .../streams/state/internals/InMemoryWindowStore.java     |  1 +
 .../streams/state/internals/MeteredWindowStore.java      |  1 +
 .../streams/state/internals/RocksDBWindowStore.java      |  1 +
 .../state/internals/TimestampedWindowStoreBuilder.java   |  1 +
 .../WindowToTimestampedWindowByteStoreAdapter.java       |  1 +
 .../org/apache/kafka/streams/perf/SimpleBenchmark.java   |  2 +-
 .../processor/internals/ProcessorContextImplTest.java    |  9 +++++++--
 .../streams/state/internals/CachingWindowStoreTest.java  | 16 +++++++++++++++-
 .../ChangeLoggingTimestampedWindowBytesStoreTest.java    |  2 ++
 .../internals/ChangeLoggingWindowBytesStoreTest.java     |  2 ++
 .../streams/state/internals/InMemoryWindowStoreTest.java |  1 +
 .../internals/MeteredTimestampedWindowStoreTest.java     |  2 ++
 .../streams/state/internals/MeteredWindowStoreTest.java  |  1 +
 .../streams/state/internals/RocksDBWindowStoreTest.java  |  4 ++++
 .../streams/state/internals/WindowBytesStoreTest.java    |  8 ++++++++
 .../kafka/streams/internals/WindowStoreFacade.java       |  1 +
 .../kafka/streams/internals/WindowStoreFacadeTest.java   |  1 +
 21 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index cd2b179..dc8f85a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -321,6 +321,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
             super(inner);
         }
 
+        @Deprecated
         @Override
         public void put(final K key,
                         final V value) {
@@ -520,6 +521,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
             super(inner);
         }
 
+        @Deprecated
         @Override
         public void put(final K key,
                         final V value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 83a0ee1..f5e69bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -46,7 +46,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V>
      * @param value The value to update, it can be null;
      *              if the serialized bytes are also null it is interpreted as deletes
      * @throws NullPointerException if the given key is {@code null}
+     *
+     * @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
+     * to identify the window frame to which the key belongs.
+     * Use {@link #put(Object, Object, long)} instead.
+     *
      */
+    @Deprecated
     void put(K key, V value);
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index b64c0eb..4bb8116 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -131,6 +131,7 @@ class CachingWindowStore
         return true;
     }
 
+    @Deprecated
     @Override
     public synchronized void put(final Bytes key,
                                  final byte[] value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index c58e9f0..8a9b91a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -94,6 +94,7 @@ class ChangeLoggingWindowBytesStore
         return wrapped().fetchAll(timeFrom, timeTo);
     }
 
+    @Deprecated
     @Override
     public void put(final Bytes key, final byte[] value) {
         // Note: It's incorrect to bypass the wrapped store here by delegating to another
method,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 49dc566..43877ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -115,6 +115,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>
{
         open = true;
     }
 
+    @Deprecated
     @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 149f9f8..b841296 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -170,6 +170,7 @@ public class MeteredWindowStore<K, V>
         return false;
     }
 
+    @Deprecated
     @Override
     public void put(final K key,
                     final V value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 49eefa1..c45fe53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -48,6 +48,7 @@ public class RocksDBWindowStore
         super.init(context, root);
     }
 
+    @Deprecated
     @Override
     public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 808d31e..d545975 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -105,6 +105,7 @@ public class TimestampedWindowStoreBuilder<K, V>
             wrapped.init(context, root);
         }
 
+        @Deprecated
         @Override
         public void put(final Bytes key,
                         final byte[] value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 7bd8665..7bf8a0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -39,6 +39,7 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes,
by
         this.store = store;
     }
 
+    @Deprecated
     @Override
     public void put(final Bytes key,
                     final byte[] valueWithTimestamp) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index be64523..4a14b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -506,7 +506,7 @@ public class SimpleBenchmark {
                         }
                         iter.close();
 
-                        store.put(key, value);
+                        store.put(key, value, timestamp);
                     }
                 };
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index fe4d948..e36b51d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -178,6 +178,7 @@ public class ProcessorContextImplTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void globalWindowStoreShouldBeReadOnly() {
         doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store
-> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -194,7 +195,9 @@ public class ProcessorContextImplTest {
         });
     }
 
+
     @Test
+    @SuppressWarnings("deprecation")
     public void globalTimestampedWindowStoreShouldBeReadOnly() {
         doTest("GlobalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String,
Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -282,6 +285,7 @@ public class ProcessorContextImplTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void localWindowStoreShouldNotAllowInitOrClose() {
         doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store
-> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -301,6 +305,7 @@ public class ProcessorContextImplTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
         doTest("LocalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String,
Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -427,7 +432,7 @@ public class ProcessorContextImplTest {
         return timestampedKeyValueStoreMock;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     private WindowStore<String, Long> windowStoreMock() {
         final WindowStore<String, Long> windowStore = mock(WindowStore.class);
 
@@ -450,7 +455,7 @@ public class ProcessorContextImplTest {
         return windowStore;
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() {
         final TimestampedWindowStore<String, Long> windowStore = mock(TimestampedWindowStore.class);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index b9db53f..64ab25a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -128,10 +128,12 @@ public class CachingWindowStoreTest {
             .transform(() -> new Transformer<String, String, KeyValue<String, String>>()
{
                 private WindowStore<String, String> store;
                 private int numRecordsProcessed;
+                private ProcessorContext context;
 
                 @SuppressWarnings("unchecked")
                 @Override
                 public void init(final ProcessorContext processorContext) {
+                    this.context = processorContext;
                     this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name");
                     int count = 0;
 
@@ -155,7 +157,7 @@ public class CachingWindowStoreTest {
                     }
                     assertThat(count, equalTo(numRecordsProcessed));
 
-                    store.put(value, value);
+                    store.put(value, value, context.timestamp());
 
                     numRecordsProcessed++;
 
@@ -206,6 +208,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldPutFetchFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));
@@ -244,6 +247,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldPutFetchRangeFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));
@@ -263,6 +267,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldGetAllFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));
@@ -285,6 +290,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldFetchAllWithinTimestampRange() {
         final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
         for (int i = 0; i < array.length; i++) {
@@ -342,6 +348,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldForwardDirtyItemsWhenFlushCalled() {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP
+ WINDOW_SIZE));
@@ -358,6 +365,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldForwardOldValuesWhenEnabled() {
         cachingStore.setFlushListener(cacheListener, true);
         final Windowed<String> windowedKey =
@@ -386,6 +394,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldForwardOldValuesWhenDisabled() {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP
+ WINDOW_SIZE));
@@ -473,6 +482,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldClearNamespaceCacheOnClose() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         assertEquals(1, cache.size());
@@ -493,6 +503,7 @@ public class CachingWindowStoreTest {
     }
 
     @Test(expected = InvalidStateStoreException.class)
+    @SuppressWarnings("deprecation")
     public void shouldThrowIfTryingToWriteToClosedCachingStore() {
         cachingStore.close();
         cachingStore.put(bytesKey("a"), bytesValue("a"));
@@ -569,11 +580,13 @@ public class CachingWindowStoreTest {
     }
 
     @Test(expected = NullPointerException.class)
+    @SuppressWarnings("deprecation")
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
         cachingStore.put(null, bytesValue("anyValue"));
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
         cachingStore.put(bytesKey("a"), null);
     }
@@ -616,6 +629,7 @@ public class CachingWindowStoreTest {
             bytesValue(value));
     }
 
+    @SuppressWarnings("deprecation")
     private int addItemsToCache() {
         int cachedSize = 0;
         int i = 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index edf210e..957a616 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -86,6 +86,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
         inner.put(bytesKey, valueAndTimestamp, 0);
         EasyMock.expectLastCall();
@@ -128,6 +129,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingTimestampedWindowBytesStore(inner, true);
         inner.put(bytesKey, valueAndTimestamp, 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index d7ad6d2..4503ad1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -83,6 +83,7 @@ public class ChangeLoggingWindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
         inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall();
@@ -122,6 +123,7 @@ public class ChangeLoggingWindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
         inner.put(bytesKey, value, 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 41ed073..9d5fc8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -125,6 +125,7 @@ public class InMemoryWindowStoreTest extends WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testExpiration() {
 
         long currentTime = 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 67f1358..2000709 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -94,6 +94,7 @@ public class MeteredTimestampedWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
         EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
         EasyMock.replay(innerStoreMock);
@@ -118,6 +119,7 @@ public class MeteredTimestampedWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
         EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
         EasyMock.replay(innerStoreMock);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 6e65f0e..1e6b7d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -114,6 +114,7 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldRecordPutLatency() {
         final byte[] bytes = "a".getBytes();
         innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 91f6aab..983bfa0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -77,6 +77,7 @@ public class RocksDBWindowStoreTest extends WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldOnlyIterateOpenSegments() {
         long currentTime = 0;
         setCurrentTime(currentTime);
@@ -104,6 +105,7 @@ public class RocksDBWindowStoreTest extends WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testRolling() {
 
         // to validate segments
@@ -379,6 +381,7 @@ public class RocksDBWindowStoreTest extends WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testSegmentMaintenance() {
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
@@ -505,6 +508,7 @@ public class RocksDBWindowStoreTest extends WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testRestore() throws Exception {
         final long startTime = SEGMENT_INTERVAL * 2;
         final long increment = SEGMENT_INTERVAL / 2;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowBytesStoreTest.java
index 489c825..8faa68f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowBytesStoreTest.java
@@ -663,6 +663,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testPutSameKeyTimestamp() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
Serdes.String());
         windowStore.init(context, windowStore);
@@ -771,6 +772,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testDeleteAndUpdate() {
 
         final long currentTime = 0;
@@ -792,6 +794,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test(expected = NullPointerException.class)
+    @SuppressWarnings("deprecation")
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
         windowStore.put(null, "anyValue");
     }
@@ -933,6 +936,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testWindowIteratorPeek() {
         final long currentTime = 0;
         setCurrentTime(currentTime);
@@ -963,6 +967,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldNotThrowConcurrentModificationException() {
         long currentTime = 0;
         setCurrentTime(currentTime);
@@ -991,6 +996,7 @@ public abstract class WindowBytesStoreTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testFetchDuplicates() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
Serdes.String());
         windowStore.init(context, windowStore);
@@ -1020,6 +1026,7 @@ public abstract class WindowBytesStoreTest {
     }
 
 
+    @SuppressWarnings("deprecation")
     private void putFirstBatch(final WindowStore<Integer, String> store,
         @SuppressWarnings("SameParameterValue") final long startTime,
         final InternalMockProcessorContext context) {
@@ -1035,6 +1042,7 @@ public abstract class WindowBytesStoreTest {
         store.put(5, "five");
     }
 
+    @SuppressWarnings("deprecation")
     private void putSecondBatch(final WindowStore<Integer, String> store,
         @SuppressWarnings("SameParameterValue") final long startTime,
         final InternalMockProcessorContext context) {
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index f6f8f33..e49584f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -36,6 +36,7 @@ public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K,
V> imp
         inner.init(context, root);
     }
 
+    @Deprecated
     @Override
     public void put(final K key,
                     final V value) {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
index 972b308..3347ddd 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -60,6 +60,7 @@ public class WindowStoreFacadeTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void shouldPutWithUnknownTimestamp() {
         mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
         expectLastCall();


Mime
View raw message