kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] 03/05: asdf
Date Wed, 07 Oct 2020 02:34:40 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-5-state-store-wrappers
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit af662e15bdea5688b49d489ca8c2ce087dcaa2d6
Author: John Roesler <vvcephei@apache.org>
AuthorDate: Tue Oct 6 09:36:39 2020 -0500

    asdf
---
 .../internals/AbstractReadOnlyDecorator.java       |  2 ++
 .../internals/AbstractReadWriteDecorator.java      |  2 ++
 .../processor/internals/ProcessorContextUtils.java |  7 +++++
 .../state/internals/CachingWindowStore.java        |  1 +
 .../internals/ChangeLoggingKeyValueBytesStore.java | 28 +++++------------
 .../internals/ChangeLoggingSessionBytesStore.java  | 17 +++-------
 .../internals/ChangeLoggingWindowBytesStore.java   | 17 +++-------
 .../state/internals/InMemoryKeyValueStore.java     |  1 +
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 36 ++++++++++++++++++++++
 .../streams/state/internals/KeyValueSegment.java   |  1 -
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |  1 +
 .../streams/state/internals/MemoryLRUCache.java    | 12 ++++++++
 .../state/internals/MeteredKeyValueStore.java      |  2 ++
 .../state/internals/MeteredSessionStore.java       |  1 +
 .../internals/MeteredTimestampedKeyValueStore.java |  1 +
 .../internals/MeteredTimestampedWindowStore.java   |  1 +
 .../state/internals/MeteredWindowStore.java        |  4 +++
 .../streams/state/internals/RocksDBStore.java      | 29 ++++++++++++-----
 .../state/internals/RocksDBWindowStore.java        |  1 +
 .../internals/TimestampedKeyValueStoreBuilder.java |  1 +
 .../internals/TimestampedWindowStoreBuilder.java   |  1 +
 .../WindowToTimestampedWindowByteStoreAdapter.java |  1 +
 .../ChangeLoggingSessionBytesStoreTest.java        | 17 ++++++++++
 23 files changed, 132 insertions(+), 52 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 74227af..20eb2c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
@@ -45,6 +46,7 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V>
extends Wra
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 41026c4..0bfe452 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
@@ -38,6 +39,7 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V>
extends Wr
         super(inner);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index a769157..9c84aab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -48,6 +48,13 @@ public final class ProcessorContextUtils {
         return (StreamsMetricsImpl) context.metrics();
     }
 
+    /**
+     * Should be removed as part of KAFKA-10217
+     */
+    public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
+        return (StreamsMetricsImpl) context.metrics();
+    }
+
     public static String changelogFor(final ProcessorContext context, final String storeName)
{
         return context instanceof InternalProcessorContext
             ? ((InternalProcessorContext) context).changelogFor(storeName)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index aea32f5..c750f3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -69,6 +69,7 @@ class CachingWindowStore
         this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         initInternal(asInternalProcessorContext(context));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index d5205ed..055a0f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -27,6 +27,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.List;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+
 public class ChangeLoggingKeyValueBytesStore
     extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
     implements KeyValueStore<Bytes, byte[]> {
@@ -37,37 +39,23 @@ public class ChangeLoggingKeyValueBytesStore
         super(inner);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         super.init(context, root);
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
-
-        // if the inner store is an LRU cache, add the eviction listener to log removed record
-        if (wrapped() instanceof MemoryLRUCache) {
-            ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
-                // pass null to indicate removal
-                log(key, null);
-            });
-        }
+        this.context = asInternalProcessorContext(context);
+        maybeSetEvictionListener();
     }
 
     @Override
     public void init(final StateStoreContext context,
                      final StateStore root) {
         super.init(context, root);
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
+        this.context = asInternalProcessorContext(context);
+        maybeSetEvictionListener(); }
 
+    private void maybeSetEvictionListener() {
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (wrapped() instanceof MemoryLRUCache) {
             ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 648a47e..0d2133d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+
 /**
  * Simple wrapper around a {@link SessionStore} to support writing
  * updates to a changelog
@@ -39,26 +41,17 @@ class ChangeLoggingSessionBytesStore
         super(bytesStore);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         super.init(context, root);
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
+        this.context = asInternalProcessorContext(context);
     }
 
     @Override
     public void init(final StateStoreContext context, final StateStore root) {
         super.init(context, root);
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
+        this.context = asInternalProcessorContext(context);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 6eee6c3..47f088e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+
 /**
  * Simple wrapper around a {@link WindowStore} to support writing
  * updates to a changelog
@@ -44,27 +46,18 @@ class ChangeLoggingWindowBytesStore
         this.retainDuplicates = retainDuplicates;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
+        this.context = asInternalProcessorContext(context);
         super.init(context, root);
     }
 
     @Override
     public void init(final StateStoreContext context,
                      final StateStore root) {
-        if (!(context instanceof InternalProcessorContext)) {
-            throw new IllegalArgumentException(
-                "Change logging requires internal features of KafkaStreams and must be disabled
for unit tests."
-            );
-        }
-        this.context = (InternalProcessorContext) context;
+        this.context = asInternalProcessorContext(context);
         super.init(context, root);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index b02459d..31041b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -50,6 +50,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]>
{
         return name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index e3d3ba6..d27d8ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -193,6 +195,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
         this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         taskId = context.taskId().toString();
@@ -223,6 +226,39 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
     }
 
     @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = ProcessorContextUtils.asInternalProcessorContext(context);
+        init(root);
+    }
+
+    private void init(final StateStore root) {
+        taskId = this.context.taskId().toString();
+        streamsMetrics = this.context.metrics();
+
+        threadId = Thread.currentThread().getName();
+        bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
+            threadId,
+            taskId,
+            METRIC_SCOPE,
+            storeName,
+            streamsMetrics
+        );
+        bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor(
+            threadId,
+            taskId,
+            METRIC_SCOPE,
+            storeName,
+            streamsMetrics
+        );
+
+        this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
+        changelogTopic = ProcessorStateManager.storeChangelogTopic(this.context.applicationId(),
storeName);
+        updateBufferMetrics();
+        open = true;
+        partition = this.context.taskId().partition;
+    }
+
+    @Override
     public boolean isOpen() {
         return open;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index 21fbc45..a64df19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -48,7 +48,6 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
         // skip the registering step
-        internalProcessorContext = context;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index bb2ef26..6bb0950 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -82,6 +82,7 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
         return store.name();
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 32a91cd..236fedc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -73,6 +74,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]>
{
         return this.name;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
 
@@ -85,6 +87,16 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]>
{
     }
 
     @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        // register the store
+        context.register(root, (key, value) -> {
+            restoring = true;
+            put(Bytes.wrap(key), value);
+            restoring = false;
+        });
+    }
+
+    @Override
     public boolean persistent() {
         return false;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 8ca7686..2c45358 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -85,6 +85,7 @@ public class MeteredKeyValueStore<K, V>
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
@@ -129,6 +130,7 @@ public class MeteredKeyValueStore<K, V>
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(),
streamsMetrics);
     }
 
+    @Deprecated
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 96fed94..8b9256d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -70,6 +70,7 @@ public class MeteredSessionStore<K, V>
         this.time = time;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index e4e6d08..bd9cb92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -51,6 +51,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
         super(inner, metricScope, time, keySerde, valueSerde);
     }
 
+    @Deprecated
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 0d83aba..fcc4bbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -49,6 +49,7 @@ class MeteredTimestampedWindowStore<K, V>
         super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
     }
 
+    @Deprecated
     @SuppressWarnings("unchecked")
     @Override
     void initStoreSerde(final ProcessorContext context) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index c4cd1f7..d47233b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -71,6 +71,7 @@ public class MeteredWindowStore<K, V>
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
@@ -110,6 +111,7 @@ public class MeteredWindowStore<K, V>
         e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(),
streamsMetrics);
     }
 
+    @Deprecated
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
@@ -298,6 +300,8 @@ public class MeteredWindowStore<K, V>
     }
 
     private void maybeRecordE2ELatency() {
+        // Context is null if the provided context isn't an implementation of InternalProcessorContext.
+        // In that case, we _can't_ get the current timestamp, so we don't record anything.
         if (e2eLatencySensor.shouldRecord() && context != null) {
             final long currentTime = time.milliseconds();
             final long e2eLatency =  currentTime - context.timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3442d83..f72b3af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -65,8 +67,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.getMetricsImpl;
 
 /**
  * A persistent key-value store based on RocksDB.
@@ -102,7 +106,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BatchWritingS
 
     private final RocksDBMetricsRecorder metricsRecorder;
 
-    ProcessorContext internalProcessorContext;
     // visible for testing
     volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
 
@@ -122,7 +125,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BatchWritingS
     }
 
     @SuppressWarnings("unchecked")
-    void openDB(final ProcessorContext context) {
+    void openDB(final Map<String, Object> configs, final File stateDir) {
         // initialize the default rocksdb options
 
         final DBOptions dbOptions = new DBOptions();
@@ -161,7 +164,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BatchWritingS
         fOptions = new FlushOptions();
         fOptions.setWaitForFlush(true);
 
-        final Map<String, Object> configs = context.appConfigs();
         final Class<RocksDBConfigSetter> configSetterClass =
             (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
 
@@ -170,7 +172,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BatchWritingS
             configSetter.setConfig(name, userSpecifiedOptions, configs);
         }
 
-        dbDir = new File(new File(context.stateDir(), parentDir), name);
+        dbDir = new File(new File(stateDir, parentDir), name);
 
         try {
             Files.createDirectories(dbDir.getParentFile().toPath());
@@ -232,13 +234,26 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>,
BatchWritingS
         }
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         // open the DB dir
-        internalProcessorContext = context;
-        metricsRecorder.init((StreamsMetricsImpl) context.metrics(), context.taskId());
-        openDB(context);
+        metricsRecorder.init(getMetricsImpl(context), context.taskId());
+        openDB(context.appConfigs(), context.stateDir());
+        batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
+
+        // value getter should always read directly from rocksDB
+        // since it is only for values that are already flushed
+        context.register(root, batchingStateRestoreCallback);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        // open the DB dir
+        metricsRecorder.init(getMetricsImpl(context), context.taskId());
+        openDB(context.appConfigs(), context.stateDir());
         batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
 
         // value getter should always read directly from rocksDB
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 7c779a6..b3ba652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -44,6 +44,7 @@ public class RocksDBWindowStore
         this.windowSize = windowSize;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext)
context : null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index 522472d..fc8e75a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -95,6 +95,7 @@ public class TimestampedKeyValueStoreBuilder<K, V>
             this.wrapped = wrapped;
         }
 
+        @Deprecated
         @Override
         public void init(final ProcessorContext context,
                          final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 454b123..d81958e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -109,6 +109,7 @@ public class TimestampedWindowStoreBuilder<K, V>
             this.wrapped = wrapped;
         }
 
+        @Deprecated
         @Override
         public void init(final ProcessorContext context,
                          final StateStore root) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index bfeb33e..586e87d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -161,6 +161,7 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes,
by
         return store.name();
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 8753342..6f85116 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
@@ -63,6 +64,22 @@ public class ChangeLoggingSessionBytesStoreTest {
         store.init((StateStoreContext) context, store);
     }
 
+    @Test void shouldDelegateDeprecatedInit() {
+        inner.init((ProcessorContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((ProcessorContext) context, store);
+        EasyMock.verify(inner);
+    }
+
+    @Test void shouldDelegateInit() {
+        inner.init((StateStoreContext) context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        store.init((StateStoreContext) context, store);
+        EasyMock.verify(inner);
+    }
+
     @Test
     public void shouldLogPuts() {
         inner.put(key1, value1);


Mime
View raw message