kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] 01/01: done
Date Wed, 07 Oct 2020 03:18:34 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4452d1b9337249d2cc6c801a9e08675425cdcbcf
Author: John Roesler <vvcephei@apache.org>
AuthorDate: Tue Oct 6 21:33:59 2020 -0500

    done
---
 .../examples/wordcount/WordCountProcessorTest.java |  1 +
 .../wordcount/WordCountTransformerTest.java        |  1 +
 .../internals/AbstractReadOnlyDecorator.java       |  6 ++++
 .../internals/AbstractReadWriteDecorator.java      |  9 ++++-
 .../internals/ChangeLoggingKeyValueBytesStore.java |  3 +-
 .../streams/state/internals/KeyValueSegment.java   |  1 -
 .../streams/state/internals/RocksDBStore.java      |  3 --
 .../state/internals/TimestampedSegment.java        |  1 -
 .../internals/GlobalProcessorContextImplTest.java  |  1 -
 .../state/internals/AbstractKeyValueStoreTest.java |  1 -
 .../state/internals/CachingKeyValueStoreTest.java  | 25 +++++++++++++
 .../state/internals/CachingSessionStoreTest.java   | 28 ++++++++++++++-
 .../state/internals/CachingWindowStoreTest.java    | 27 +++++++++++++-
 .../ChangeLoggingKeyValueBytesStoreTest.java       | 42 +++++++++++++++++++---
 ...geLoggingTimestampedKeyValueBytesStoreTest.java | 42 +++++++++++++++++++---
 ...angeLoggingTimestampedWindowBytesStoreTest.java | 20 +++++++++++
 .../ChangeLoggingWindowBytesStoreTest.java         | 20 +++++++++++
 .../internals/InMemoryKeyValueLoggedStoreTest.java |  1 -
 .../state/internals/InMemoryKeyValueStoreTest.java |  1 -
 .../state/internals/InMemoryLRUCacheStoreTest.java |  1 -
 .../state/internals/MeteredKeyValueStoreTest.java  | 38 ++++++++++++++++++++
 .../state/internals/MeteredSessionStoreTest.java   | 38 ++++++++++++++++++++
 .../MeteredTimestampedKeyValueStoreTest.java       | 38 ++++++++++++++++++++
 .../MeteredTimestampedWindowStoreTest.java         | 41 +++++++++++++++++++++
 .../state/internals/MeteredWindowStoreTest.java    | 40 +++++++++++++++++++++
 .../state/internals/RocksDBKeyValueStoreTest.java  |  1 -
 .../kafka/streams/MockProcessorContextTest.java    |  3 +-
 .../streams/internals/KeyValueStoreFacadeTest.java |  1 +
 .../streams/internals/WindowStoreFacadeTest.java   |  1 +
 .../wordcount/WindowedWordCountProcessorTest.java  |  4 +++
 30 files changed, 414 insertions(+), 25 deletions(-)

diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index bec77e6..ba52990 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertTrue;
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void test() {
         final MockProcessorContext context = new MockProcessorContext();
diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
index 98d5012..6bb8c6c 100644
--- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
+++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue;
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
  */
 public class WordCountTransformerTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void test() {
         final MockProcessorContext context = new MockProcessorContext();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 20eb2c0..4424cdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -54,6 +54,12 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends Wra
     }
 
     @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
     public void close() {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 0bfe452..d077089 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -32,6 +31,8 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
+import java.util.List;
+
 abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
     static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
 
@@ -47,6 +48,12 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
     }
 
     @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
     public void close() {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 055a0f4..d6526a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -53,7 +53,8 @@ public class ChangeLoggingKeyValueBytesStore
                      final StateStore root) {
         super.init(context, root);
         this.context = asInternalProcessorContext(context);
-        maybeSetEvictionListener(); }
+        maybeSetEvictionListener();
+    }
 
     private void maybeSetEvictionListener() {
         // if the inner store is an LRU cache, add the eviction listener to log removed record
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 55a73c2..b6d6504 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
 import java.io.File;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f72b3af..3241d07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -27,8 +27,6 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -67,7 +65,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.getMetricsImpl;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 59eb469..36fe0e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
 import java.io.File;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 1ae6630..eba6f06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.To;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 4ab15d8..3284b17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index c6fe483..89e2b0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -100,6 +100,31 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         return store;
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final CachingKeyValueStore outer = new CachingKeyValueStore(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final CachingKeyValueStore outer = new CachingKeyValueStore(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldSetFlushListener() {
         assertTrue(store.setFlushListener(null, true));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index f36629d..05e97a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -89,7 +90,7 @@ public class CachingSessionStoreTest {
     public void before() {
         cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null));
         cachingStore.init((StateStoreContext) context, cachingStore);
     }
@@ -99,6 +100,31 @@ public class CachingSessionStoreTest {
         cachingStore.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class);
+        final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final SessionStore<Bytes, byte[]> inner = EasyMock.mock(InMemorySessionStore.class);
+        final CachingSessionStore outer = new CachingSessionStore(inner, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldPutFetchFromCache() {
         cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 42b750b..2a04c48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -42,7 +43,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -113,6 +113,31 @@ public class CachingWindowStoreTest {
         cachingStore.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = EasyMock.mock(WindowStore.class);
+        final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = EasyMock.mock(WindowStore.class);
+        final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldNotReturnDuplicatesInRanges() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9106580..c3808b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -21,11 +21,15 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,14 +53,19 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = mockContext();
+        context.setTime(0);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private InternalMockProcessorContext mockContext() {
+        return new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init((StateStoreContext) context, store);
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
     }
 
     @After
@@ -64,6 +73,31 @@ public class ChangeLoggingKeyValueBytesStoreTest {
         store.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> innerMock = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
+        innerMock.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerMock);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(innerMock);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> innerMock = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingKeyValueBytesStore(innerMock);
+        innerMock.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(innerMock);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(innerMock);
+    }
+
     @Test
     public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
         store.put(hi, there);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index e05b171..8295f7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -21,12 +21,16 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,14 +58,19 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        final InternalMockProcessorContext context = mockContext();
+        context.setTime(0);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private InternalMockProcessorContext mockContext() {
+        return new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        context.setTime(0);
-        store.init((StateStoreContext) context, store);
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
     }
 
     @After
@@ -69,6 +78,31 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
         store.close();
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final InternalMockProcessorContext context = mockContext();
+        final KeyValueStore<Bytes, byte[]> inner = EasyMock.mock(InMemoryKeyValueStore.class);
+        final StateStore outer = new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
         store.put(hi, rawThere);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index e928678..6608739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -65,6 +66,25 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        inner.init((ProcessorContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((ProcessorContext) context, store);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        inner.init((StateStoreContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((StateStoreContext) context, store);
+        EasyMock.verify(inner);
+    }
+
     @Test
     @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 82835bc..c2c31e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -63,6 +64,25 @@ public class ChangeLoggingWindowBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        inner.init((ProcessorContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((ProcessorContext) context, store);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        inner.init((StateStoreContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((StateStoreContext) context, store);
+        EasyMock.verify(inner);
+    }
+
     @Test
     @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 6f4104e..f54fca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index b6c46e2..9730936 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index ae791cb..a044eda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 83fffef..68faa15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -158,6 +159,43 @@ public class MeteredKeyValueStoreTest {
         metered.init((StateStoreContext) context, metered);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(KeyValueStore.class);
+        final MeteredKeyValueStore<String, String> outer = new MeteredKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            Serdes.String()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(KeyValueStore.class);
+        final MeteredKeyValueStore<String, String> outer = new MeteredKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            Serdes.String()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 0ff822e..a77dd07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -162,6 +163,43 @@ public class MeteredSessionStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
+        final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>(
+            inner,
+            STORE_TYPE,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
+        final MeteredSessionStore<String, String> outer = new MeteredSessionStore<>(
+            inner,
+            STORE_TYPE,
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 3d28266..405ab4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -162,6 +163,43 @@ public class MeteredTimestampedKeyValueStoreTest {
         metered.init((StateStoreContext) context, metered);
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(InMemoryKeyValueStore.class);
+        final MeteredTimestampedKeyValueStore<String, String> outer = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(Serdes.String())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final KeyValueStore<Bytes, byte[]> inner = mock(InMemoryKeyValueStore.class);
+        final MeteredTimestampedKeyValueStore<String, String> outer = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(Serdes.String())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner, context);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 9a9d763..315a1aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -42,6 +43,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.replay;
@@ -95,6 +97,45 @@ public class MeteredTimestampedWindowStoreTest {
         );
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredTimestampedWindowStore<String, String> outer = new MeteredTimestampedWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredTimestampedWindowStore<String, String> outer = new MeteredTimestampedWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(new SerdeThatDoesntHandleNull())
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 7301694..18c0fa3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -150,6 +151,45 @@ public class MeteredWindowStoreTest {
         );
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new SerdeThatDoesntHandleNull()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
+        final MeteredWindowStore<String, String> outer = new MeteredWindowStore<>(
+            inner,
+            WINDOW_SIZE_MS, // any size
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            new SerdeThatDoesntHandleNull()
+        );
+        expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        expectLastCall();
+        replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        verify(inner);
+    }
+
     @Test
     public void shouldPassChangelogTopicNameToStateStoreSerde() {
         context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index c0b5a12..e71809e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index fe901ed..6e2f4ed 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -231,8 +231,7 @@ public class MockProcessorContextTest {
 
         assertFalse(context.committed());
     }
-
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437
     @Test
     public void shouldStoreAndReturnStateStores() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
index b7814c7..4d7a277 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
@@ -53,6 +53,7 @@ public class KeyValueStoreFacadeTest {
         keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldForwardInit() {
         final ProcessorContext context = mock(ProcessorContext.class);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
index 3347ddd..6a2c6bd 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -47,6 +47,7 @@ public class WindowStoreFacadeTest {
         windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldForwardInit() {
         final ProcessorContext context = mock(ProcessorContext.class);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
index 4995a77..00a9f8f 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThrows;
 
 public class WindowedWordCountProcessorTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithInMemoryStore() {
         final MockProcessorContext context = new MockProcessorContext();
@@ -86,6 +87,7 @@ public class WindowedWordCountProcessorTest {
         assertThat(capturedForwards.hasNext(), is(false));
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithPersistentStore() throws IOException {
         final Properties properties = new Properties();
@@ -146,6 +148,7 @@ public class WindowedWordCountProcessorTest {
         }
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldFailWithLogging() {
         final MockProcessorContext context = new MockProcessorContext();
@@ -164,6 +167,7 @@ public class WindowedWordCountProcessorTest {
         assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
     }
 
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldFailWithCaching() {
         final MockProcessorContext context = new MockProcessorContext();


Mime
View raw message