kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (#6293)
Date Tue, 26 Feb 2019 03:59:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21b7963  KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (#6293)
21b7963 is described below

commit 21b79635474209fcd67d3bf33a70c25f6827c6ef
Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
AuthorDate: Mon Feb 25 19:59:14 2019 -0800

    KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (#6293)
    
    First PR in series to inline the generic parameters of the following bytes stores:
    
    [x] InMemoryKeyValueStore
    [ ] RocksDBWindowStore
    [ ] RocksDBSessionStore
    [ ] MemoryLRUCache
    [ ] MemoryNavigableLRUCache
    [ ] (awaiting merge) InMemoryWindowStore
    
    A number of tests took advantage of the generic InMemoryKeyValueStore and had to be reworked
somewhat -- this PR covers everything related to the in-memory key-value store.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,
 John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
 .../org/apache/kafka/streams/state/Stores.java     |  2 +-
 .../DelegatingPeekingKeyValueIterator.java         |  2 +-
 .../state/internals/InMemoryKeyValueStore.java     | 71 ++++++++-------------
 .../kstream/internals/KTableReduceTest.java        |  7 +--
 .../internals/KeyValueStoreMaterializerTest.java   |  3 +-
 .../state/internals/CachingKeyValueStoreTest.java  |  4 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java       |  2 +-
 .../DelegatingPeekingKeyValueIteratorTest.java     |  7 ++-
 .../state/internals/FilteredCacheIteratorTest.java |  4 +-
 ...dSortedCacheKeyValueBytesStoreIteratorTest.java |  4 +-
 .../kafka/test/GenericInMemoryKeyValueStore.java}  | 73 ++++++++--------------
 .../kafka/streams/MockProcessorContextTest.java    | 15 ++++-
 12 files changed, 82 insertions(+), 112 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index d8b19fd..46a9d45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -104,7 +104,7 @@ public class Stores {
 
             @Override
             public KeyValueStore<Bytes, byte[]> get() {
-                return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
+                return new InMemoryKeyValueStore(name);
             }
 
             @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
index 673a7c9..20a434a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
 /**
  * Optimized {@link KeyValueIterator} used when the same element could be peeked multiple
times.
  */
-class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>,
PeekingKeyValueIterator<K, V> {
+public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K,
V>, PeekingKeyValueIterator<K, V> {
     private final KeyValueIterator<K, V> underlying;
     private final String storeName;
     private KeyValue<K, V> next;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index d6dd42a..cc28d64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -16,39 +16,27 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import java.util.List;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
-public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
+public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-    private final NavigableMap<K, V> map;
+    private final NavigableMap<Bytes, byte[]> map;
     private volatile boolean open = false;
 
-    private StateSerdes<K, V> serdes;
-
-    public InMemoryKeyValueStore(final String name,
-                                 final Serde<K> keySerde,
-                                 final Serde<V> valueSerde) {
+    public InMemoryKeyValueStore(final String name) {
         this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
 
-        // TODO: when we have serde associated with class types, we can
-        // improve this situation by passing the comparator here.
         this.map = new TreeMap<>();
     }
 
@@ -61,20 +49,15 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
     @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        // construct the serde
-        this.serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         if (root != null) {
             // register the store
             context.register(root, (key, value) -> {
                 // this is a delete
                 if (value == null) {
-                    delete(serdes.keyFrom(key));
+                    delete(Bytes.wrap(key));
                 } else {
-                    put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    put(Bytes.wrap(key), value);
                 }
             });
         }
@@ -93,13 +76,12 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     @Override
-    public synchronized V get(final K key) {
+    public synchronized byte[] get(final Bytes key) {
         return this.map.get(key);
     }
 
     @Override
-    public synchronized void put(final K key,
-                                 final V value) {
+    public synchronized void put(final Bytes key, final byte[] value) {
         if (value == null) {
             this.map.remove(key);
         } else {
@@ -108,9 +90,8 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     @Override
-    public synchronized V putIfAbsent(final K key,
-                                      final V value) {
-        final V originalValue = get(key);
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        final byte[] originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
         }
@@ -118,29 +99,29 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     @Override
-    public synchronized void putAll(final List<KeyValue<K, V>> entries) {
-        for (final KeyValue<K, V> entry : entries) {
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries)
{
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
             put(entry.key, entry.value);
         }
     }
 
     @Override
-    public synchronized V delete(final K key) {
+    public synchronized byte[] delete(final Bytes key) {
         return this.map.remove(key);
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> range(final K from,
-                                                     final K to) {
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                              final Bytes to) {
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+            new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
     @Override
-    public synchronized KeyValueIterator<K, V> all() {
-        final TreeMap<K, V> copy = new TreeMap<>(this.map);
-        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final TreeMap<Bytes, byte[]> copy = new TreeMap<>(this.map);
+        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(copy.entrySet().iterator()));
     }
 
     @Override
@@ -159,10 +140,10 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
         this.open = false;
     }
 
-    private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K,
V> {
-        private final Iterator<Map.Entry<K, V>> iter;
+    private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]>
{
+        private final Iterator<Map.Entry<Bytes, byte[]>> iter;
 
-        private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter)
{
+        private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes, byte[]>>
iter) {
             this.iter = iter;
         }
 
@@ -172,8 +153,8 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
         }
 
         @Override
-        public KeyValue<K, V> next() {
-            final Map.Entry<K, V> entry = iter.next();
+        public KeyValue<Bytes, byte[]> next() {
+            final Map.Entry<Bytes, byte[]> entry = iter.next();
             return new KeyValue<>(entry.getKey(), entry.getValue());
         }
 
@@ -188,7 +169,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
         }
 
         @Override
-        public K peekNextKey() {
+        public Bytes peekNextKey() {
             throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index 05b74dc..afb2cc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -19,7 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.junit.Test;
 
@@ -44,9 +45,7 @@ public class KTableReduceTest {
                 this::differenceNotNullArgs
             ).get();
 
-
-        final InMemoryKeyValueStore<String, Set<String>> myStore =
-            new InMemoryKeyValueStore<>("myStore", null, null);
+        final KeyValueStore<String, Set<String>> myStore = new GenericInMemoryKeyValueStore<>("myStore");
 
         context.register(myStore, null);
         reduceProcessor.init(context);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index bb1dec2..30080c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
@@ -107,7 +106,7 @@ public class KeyValueStoreMaterializerTest {
     @Test
     public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
         final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
-        final InMemoryKeyValueStore<Bytes, byte[]> store = new InMemoryKeyValueStore<>("name",
Serdes.Bytes(), Serdes.ByteArray());
+        final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name");
         EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
         EasyMock.expect(supplier.get()).andReturn(store);
         EasyMock.replay(supplier);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index b1a6401..6c2b7cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -58,7 +58,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     private final int maxCacheSizeBytes = 150;
     private InternalMockProcessorContext context;
     private CachingKeyValueStore<String, String> store;
-    private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
+    private InMemoryKeyValueStore underlyingStore;
     private ThreadCache cache;
     private CacheFlushListenerStub<String, String> cacheFlushListener;
     private String topic;
@@ -66,7 +66,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     @Before
     public void setUp() {
         final String storeName = "store";
-        underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
+        underlyingStore = new InMemoryKeyValueStore(storeName);
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener, false);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 5fdfd46..5645b8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -43,7 +43,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class ChangeLoggingKeyValueBytesStoreTest {
 
     private InternalMockProcessorContext context;
-    private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv",
Serdes.Bytes(), Serdes.ByteArray());
+    private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv");
     private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
     private final Map<Object, Object> sent = new HashMap<>();
     private final Bytes hi = Bytes.wrap("hi".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index 8b6fc95..593b265 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -16,8 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -29,11 +30,11 @@ import static org.junit.Assert.assertTrue;
 public class DelegatingPeekingKeyValueIteratorTest {
 
     private final String name = "name";
-    private InMemoryKeyValueStore<String, String> store;
+    private KeyValueStore<String, String> store;
 
     @Before
     public void setUp() {
-        store = new InMemoryKeyValueStore<>(name, Serdes.String(), Serdes.String());
+        store = new GenericInMemoryKeyValueStore<>(name);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
index 4a0796d..bf54786 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -47,7 +49,7 @@ public class FilteredCacheIteratorTest {
     };
 
     @SuppressWarnings("unchecked")
-    private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name",
null, null);
+    private final KeyValueStore<Bytes, LRUCacheEntry> store = new GenericInMemoryKeyValueStore<>("my-store");
     private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
                                                                             new LRUCacheEntry("1".getBytes()));
     private final List<KeyValue<Bytes, LRUCacheEntry>> entries = asList(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index d7f164c..4028b0c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -39,7 +39,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
 
     @Before
     public void setUp() throws Exception {
-        store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
+        store = new InMemoryKeyValueStore(namespace);
         cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new
Metrics()));
     }
 
@@ -146,7 +146,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
 
     @Test
     public void shouldPeekNextKey() throws Exception {
-        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one",
Serdes.Bytes(), Serdes.ByteArray());
+        final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore("one");
         final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L,
new MockStreamsMetrics(new Metrics()));
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
         for (int i = 0; i < bytes.length - 1; i += 2) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
similarity index 64%
copy from streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
copy to streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index d6dd42a..e9d20f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -14,41 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.test;
 
-import org.apache.kafka.common.serialization.Serde;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+/**
+ * This class is a generic version of the in-memory key-value store that is useful for testing
when you
+ *  need a basic KeyValueStore for arbitrary types and don't have/want to write a serde
+ */
+public class GenericInMemoryKeyValueStore<K extends Comparable, V> implements KeyValueStore<K,
V> {
 
-public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final String name;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
     private final NavigableMap<K, V> map;
     private volatile boolean open = false;
 
-    private StateSerdes<K, V> serdes;
-
-    public InMemoryKeyValueStore(final String name,
-                                 final Serde<K> keySerde,
-                                 final Serde<V> valueSerde) {
+    public GenericInMemoryKeyValueStore(final String name) {
         this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
 
-        // TODO: when we have serde associated with class types, we can
-        // improve this situation by passing the comparator here.
         this.map = new TreeMap<>();
     }
 
@@ -59,24 +52,10 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context,
-                     final StateStore root) {
-        // construct the serde
-        this.serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
+    /* This is a "dummy" store used for testing and does not support restoring from changelog
since we allow it to be serde-ignorant */
+    public void init(final ProcessorContext context, final StateStore root) {
         if (root != null) {
-            // register the store
-            context.register(root, (key, value) -> {
-                // this is a delete
-                if (value == null) {
-                    delete(serdes.keyFrom(key));
-                } else {
-                    put(serdes.keyFrom(key), serdes.valueFrom(value));
-                }
-            });
+            context.register(root, null);
         }
 
         this.open = true;
@@ -99,7 +78,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     public synchronized void put(final K key,
-                                 final V value) {
+        final V value) {
         if (value == null) {
             this.map.remove(key);
         } else {
@@ -109,7 +88,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     public synchronized V putIfAbsent(final K key,
-                                      final V value) {
+        final V value) {
         final V originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -131,16 +110,16 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     public synchronized KeyValueIterator<K, V> range(final K from,
-                                                     final K to) {
+        final K to) {
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
+            new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
     @Override
     public synchronized KeyValueIterator<K, V> all() {
         final TreeMap<K, V> copy = new TreeMap<>(this.map);
-        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
+        return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
     }
 
     @Override
@@ -159,10 +138,10 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
         this.open = false;
     }
 
-    private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K,
V> {
-        private final Iterator<Map.Entry<K, V>> iter;
+    private static class GenericInMemoryKeyValueIterator<K, V> implements KeyValueIterator<K,
V> {
+        private final Iterator<Entry<K, V>> iter;
 
-        private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter)
{
+        private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>>
iter) {
             this.iter = iter;
         }
 
@@ -192,4 +171,4 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
             throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
         }
     }
-}
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
index 41b62f9..32c479c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
 import java.io.File;
@@ -230,6 +232,7 @@ public class MockProcessorContextTest {
         assertFalse(context.committed());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldStoreAndReturnStateStores() {
         final AbstractProcessor<String, Long> processor = new AbstractProcessor<String,
Long>() {
@@ -243,10 +246,16 @@ public class MockProcessorContextTest {
         };
 
         final MockProcessorContext context = new MockProcessorContext();
-        final KeyValueStore<String, Long> store = new InMemoryKeyValueStore<>("my-state",
Serdes.String(), Serdes.Long());
-        context.register(store, null);
+
+        final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("my-state"),
+                Serdes.String(),
+                Serdes.Long()).withLoggingDisabled();
+
+        final KeyValueStore<String, Long> store = (KeyValueStore<String, Long>)
storeBuilder.build();
 
         store.init(context, store);
+
         processor.init(context);
 
         processor.process("foo", 5L);


Mime
View raw message