kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: initialize Serdes with ProcessorContext
Date Wed, 25 Nov 2015 23:21:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 69a1cced4 -> 5e8958a85


MINOR: initialize Serdes with ProcessorContext

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #589 from ymatsuda/init_serdes_with_procctx


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e8958a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e8958a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e8958a8

Branch: refs/heads/trunk
Commit: 5e8958a856a5b4ccbdcb610473cab4c2eeddbac5
Parents: 69a1cce
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Nov 25 15:21:17 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 25 15:21:17 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/examples/ProcessorJob.java    |  2 +-
 .../streams/state/MeteredKeyValueStore.java     |  1 +
 .../state/RocksDBKeyValueStoreSupplier.java     |  2 +
 .../org/apache/kafka/streams/state/Serdes.java  | 61 +++++++-------------
 .../org/apache/kafka/streams/state/Stores.java  |  5 +-
 .../internals/ProcessorTopologyTest.java        |  2 +-
 .../state/AbstractKeyValueStoreTest.java        | 12 ++--
 .../state/InMemoryKeyValueStoreTest.java        |  6 +-
 .../state/InMemoryLRUCacheStoreTest.java        |  7 ++-
 .../streams/state/KeyValueStoreTestDriver.java  |  9 ---
 .../streams/state/RocksDBKeyValueStoreTest.java |  6 +-
 11 files changed, 40 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 3274aae..882c7ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -104,7 +104,7 @@ public class ProcessorJob {
         builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
 
         builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
-        builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
+        builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
         builder.connectProcessorAndStateStores("local-state", "PROCESS");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(),
"PROCESS");

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index c1ccbd4..b68f763 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K,
V> {
         this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name",
name);
         this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name",
name);
 
+        serialization.init(context);
         this.context = context;
         this.partition = context.id().partition;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
index fe8f00a..f1fbd9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
@@ -118,6 +118,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier
{
         }
 
         public void init(ProcessorContext context) {
+            serdes.init(context);
+
             this.context = context;
             this.partition = context.id().partition;
             this.dbName = this.topic + "." + this.partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 31bd439..f41d928 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 
 final class Serdes<K, V> {
 
@@ -57,19 +57,21 @@ final class Serdes<K, V> {
     }
 
     private final String topic;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+    private Serializer<K> keySerializer;
+    private Serializer<V> valueSerializer;
+    private Deserializer<K> keyDeserializer;
+    private Deserializer<V> valueDeserializer;
 
     /**
-     * Create a context for serialization using the specified serializers and deserializers.
+     * Create a context for serialization using the specified serializers and deserializers,
or if any of them are null the
+     * corresponding {@link ProcessorContext}'s serializer or deserializer, which
+     * <em>must</em> match the key and value types used as parameters for this
object.
      *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may not be null
-     * @param keyDeserializer the deserializer for keys; may not be null
-     * @param valueSerializer the serializer for values; may not be null
-     * @param valueDeserializer the deserializer for values; may not be null
+     * @param keySerializer the serializer for keys; may be null
+     * @param keyDeserializer the deserializer for keys; may be null
+     * @param valueSerializer the serializer for values; may be null
+     * @param valueDeserializer the deserializer for values; may be null
      */
     public Serdes(String topic,
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
@@ -82,45 +84,22 @@ final class Serdes<K, V> {
     }
 
     /**
-     * Create a context for serialization using the specified serializers and deserializers,
or if any of them are null the
-     * corresponding {@link StreamingConfig}'s serializer or deserializer, which
+     * Create a context for serialization using the {@link ProcessorContext}'s serializers
and deserializers, which
      * <em>must</em> match the key and value types used as parameters for this
object.
      *
      * @param topic the name of the topic
-     * @param keySerializer the serializer for keys; may be null if the {@link StreamingConfig#keySerializer()
default
-     *            key serializer} should be used
-     * @param keyDeserializer the deserializer for keys; may be null if the {@link StreamingConfig#keyDeserializer()
default
-     *            key deserializer} should be used
-     * @param valueSerializer the serializer for values; may be null if the {@link StreamingConfig#valueSerializer()
default
-     *            value serializer} should be used
-     * @param valueDeserializer the deserializer for values; may be null if the {@link StreamingConfig#valueDeserializer()
-     *            default value deserializer} should be used
-     * @param config the streaming config
      */
     @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-            Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
-            Serializer<V> valueSerializer, Deserializer<V> valueDeserializer,
-            StreamingConfig config) {
-        this.topic = topic;
-
-        this.keySerializer = keySerializer != null ? keySerializer : config.keySerializer();
-        this.keyDeserializer = keyDeserializer != null ? keyDeserializer : config.keyDeserializer();
-        this.valueSerializer = valueSerializer != null ? valueSerializer : config.valueSerializer();
-        this.valueDeserializer = valueDeserializer != null ? valueDeserializer : config.valueDeserializer();
+    public Serdes(String topic) {
+        this(topic, null, null, null, null);
     }
 
-    /**
-     * Create a context for serialization using the {@link StreamingConfig}'s serializers
and deserializers, which
-     * <em>must</em> match the key and value types used as parameters for this
object.
-     *
-     * @param topic the name of the topic
-     * @param config the streaming config
-     */
     @SuppressWarnings("unchecked")
-    public Serdes(String topic,
-                  StreamingConfig config) {
-        this(topic, null, null, null, null, config);
+    public void init(ProcessorContext context) {
+        keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
+        keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>)
context.keyDeserializer();
+        valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>)
context.valueSerializer();
+        valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>)
context.valueDeserializer();
     }
 
     public Deserializer<K> keyDeserializer() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c5f040f..5452040 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
@@ -40,7 +39,7 @@ public class Stores {
      * @param name the name of the store
      * @return the factory that can be used to specify other options or configurations for
the store; never null
      */
-    public static StoreFactory create(final String name, final StreamingConfig config) {
+    public static StoreFactory create(final String name) {
         return new StoreFactory() {
             @Override
             public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer,
final Deserializer<K> keyDeserializer) {
@@ -49,7 +48,7 @@ public class Stores {
                     public <V> KeyValueFactory<K, V> withValues(final Serializer<V>
valueSerializer,
                                                                 final Deserializer<V>
valueDeserializer) {
                         final Serdes<K, V> serdes =
-                                new Serdes<>(name, keySerializer, keyDeserializer,
valueSerializer, valueDeserializer, config);
+                                new Serdes<>(name, keySerializer, keyDeserializer,
valueSerializer, valueDeserializer);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 54096b2..2f359bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -204,7 +204,7 @@ public class ProcessorTopologyTest {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)),
"source")
                                     .addStateStore(
-                                            Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
+                                            Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
                                             "processor"
                                     )
                                     .addSink("counts", OUTPUT_TOPIC_1, "processor");

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index 209f3c9..d40f308 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,14 +21,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig
config,
-                                                                      ProcessorContext context,
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext
context,
                                                                       Class<K> keyClass,
Class<V> valueClass,
                                                                       boolean useContextSerdes);
 
@@ -36,7 +34,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(),
driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(),
Integer.class, String.class, false);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -102,7 +100,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRangeWithDefaultSerdes() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class,
String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(),
driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(),
Integer.class, String.class, true);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -152,7 +150,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(),
driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(),
Integer.class, String.class, false);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -178,7 +176,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(),
driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(),
Integer.class, String.class, true);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index b3fe98c..2b90d0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
@@ -27,7 +26,6 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
             ProcessorContext context,
             Class<K> keyClass, Class<V> valueClass,
             boolean useContextSerdes) {
@@ -38,9 +36,9 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
{
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer,
valDeser).inMemory().build();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer,
valDeser).inMemory().build();
         } else {
-            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index dddb9c7..81adfad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -26,11 +26,12 @@ import org.junit.Test;
 
 public class InMemoryLRUCacheStoreTest {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
@@ -82,7 +83,7 @@ public class InMemoryLRUCacheStoreTest {
         Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
         Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
         Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      .withKeys(keySer, keyDeser)
                                                      .withValues(valSer, valDeser)
                                                      .inMemory().maxEntries(3)
@@ -138,7 +139,7 @@ public class InMemoryLRUCacheStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
+        StateStoreSupplier supplier = Stores.create("my-store")
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 8bab1c9..28cc3af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,15 +362,6 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
-     * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
-     *
-     * @return the streaming config; never null
-     */
-    public StreamingConfig config() {
-        return config;
-    }
-
-    /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s constructor.
This context records any messages
      * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)}
and
      * {@link #flushedEntryRemoved(Object)} methods.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e8958a8/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 37a12f9..20e92ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
@@ -27,7 +26,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(
-            StreamingConfig config,
             ProcessorContext context,
             Class<K> keyClass,
             Class<V> valueClass,
@@ -39,9 +37,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest
{
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer,
valDeser).localDatabase().build();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer,
valDeser).localDatabase().build();
         } else {
-            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();


Mime
View raw message