kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2706: make state stores first class citizens in the processor topology
Date Mon, 02 Nov 2015 21:19:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6383593fe -> 758272267


http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 8ec73fd..c5f040f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -26,7 +26,8 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
  * Factory for creating key-value stores.
@@ -34,13 +35,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 public class Stores {
 
     /**
-     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStore} instance.
-     * 
+     * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
+     *
      * @param name the name of the store
-     * @param context the processor context
      * @return the factory that can be used to specify other options or configurations for the store; never null
      */
-    public static StoreFactory create(final String name, final ProcessorContext context) {
+    public static StoreFactory create(final String name, final StreamingConfig config) {
         return new StoreFactory() {
             @Override
             public <K> ValueFactory<K> withKeys(final Serializer<K> keySerializer, final Deserializer<K> keyDeserializer) {
@@ -48,8 +48,8 @@ public class Stores {
                     @Override
                     public <V> KeyValueFactory<K, V> withValues(final Serializer<V> valueSerializer,
                                                                 final Deserializer<V> valueDeserializer) {
-                        final Serdes<K, V> serdes = new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
-                                context);
+                        final Serdes<K, V> serdes =
+                                new Serdes<>(name, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, config);
                         return new KeyValueFactory<K, V>() {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
@@ -64,11 +64,11 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public KeyValueStore<K, V> build() {
+                                    public StateStoreSupplier build() {
                                         if (capacity < Integer.MAX_VALUE) {
-                                            return InMemoryLRUCacheStore.create(name, capacity, context, serdes, null);
+                                            return new InMemoryLRUCacheStoreSupplier<>(name, capacity, serdes, null);
                                         }
-                                        return new InMemoryKeyValueStore<>(name, context, serdes, null);
+                                        return new InMemoryKeyValueStoreSupplier<>(name, serdes, null);
                                     }
                                 };
                             }
@@ -77,8 +77,8 @@ public class Stores {
                             public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
                                 return new LocalDatabaseKeyValueFactory<K, V>() {
                                     @Override
-                                    public KeyValueStore<K, V> build() {
-                                        return new RocksDBKeyValueStore<>(name, context, serdes, null);
+                                    public StateStoreSupplier build() {
+                                        return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
                                     }
                                 };
                             }
@@ -92,7 +92,7 @@ public class Stores {
     public static abstract class StoreFactory {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<String> withStringKeys() {
@@ -101,7 +101,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Integer> withIntegerKeys() {
@@ -110,7 +110,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<Long> withLongKeys() {
@@ -119,7 +119,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
-         * 
+         *
          * @return the interface used to specify the type of values; never null
          */
         public ValueFactory<byte[]> withByteArrayKeys() {
@@ -129,7 +129,7 @@ public class Stores {
         /**
          * Begin to create a {@link KeyValueStore} by specifying the keys will be either {@link String}, {@link Integer},
          * {@link Long}, or {@code byte[]}.
-         * 
+         *
          * @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serializers and
          *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
          *            {@code byte[].class})
@@ -141,7 +141,7 @@ public class Stores {
 
         /**
          * Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
-         * 
+         *
          * @param keySerializer the serializer for keys; may not be null
          * @param keyDeserializer the deserializer for keys; may not be null
          * @return the interface used to specify the type of values; never null
@@ -151,13 +151,13 @@ public class Stores {
 
     /**
      * The factory for creating off-heap key-value stores.
-     * 
+     *
      * @param <K> the type of keys
      */
     public static abstract class ValueFactory<K> {
         /**
          * Use {@link String} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, String> withStringValues() {
@@ -166,7 +166,7 @@ public class Stores {
 
         /**
          * Use {@link Integer} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Integer> withIntegerValues() {
@@ -175,7 +175,7 @@ public class Stores {
 
         /**
          * Use {@link Long} values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, Long> withLongValues() {
@@ -184,7 +184,7 @@ public class Stores {
 
         /**
          * Use byte arrays for values.
-         * 
+         *
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public KeyValueFactory<K, byte[]> withByteArrayValues() {
@@ -194,7 +194,7 @@ public class Stores {
         /**
          * Use values of the specified type, which must be either {@link String}, {@link Integer}, {@link Long}, or {@code byte[]}
          * .
-         * 
+         *
          * @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serializers and
          *            deserializers (e.g., {@code String.class}, {@code Integer.class}, {@code Long.class}, or
          *            {@code byte[].class})
@@ -206,7 +206,7 @@ public class Stores {
 
         /**
          * Use the specified serializer and deserializer for the values.
-         * 
+         *
          * @param valueSerializer the serializer for value; may not be null
          * @param valueDeserializer the deserializer for values; may not be null
          * @return the interface used to specify the remaining key-value store options; never null
@@ -224,7 +224,7 @@ public class Stores {
         /**
          * Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
          * read to restore the entries if they are lost.
-         * 
+         *
          * @return the factory to create in-memory key-value stores; never null
          */
         InMemoryKeyValueFactory<K, V> inMemory();
@@ -232,7 +232,7 @@ public class Stores {
         /**
          * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
          * topic that can be read to restore the entries if they are lost.
-         * 
+         *
          * @return the factory to create in-memory key-value stores; never null
          */
         LocalDatabaseKeyValueFactory<K, V> localDatabase();
@@ -248,7 +248,7 @@ public class Stores {
         /**
          * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
          * equivalent to not placing a limit on the number of entries.
-         * 
+         *
          * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
          * @return this factory
          * @throws IllegalArgumentException if the capacity is not positive
@@ -256,10 +256,10 @@ public class Stores {
         InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
 
         /**
-         * Return the new key-value store.
-         * @return the key-value store; never null
+         * Return the instance of StateStoreSupplier of new key-value store.
+         * @return the state store supplier; never null
          */
-        KeyValueStore<K, V> build();
+        StateStoreSupplier build();
     }
 
     /**
@@ -270,9 +270,9 @@ public class Stores {
      */
     public static interface LocalDatabaseKeyValueFactory<K, V> {
         /**
-         * Return the new key-value store.
+         * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */
-        KeyValueStore<K, V> build();
+        StateStoreSupplier build();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b77c253..de1328e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -106,6 +107,55 @@ public class TopologyBuilderTest {
         assertEquals(3, builder.sourceTopics().size());
     }
 
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithNonExistingProcessor() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithSource() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithSink() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSink("sink-1", "topic-1");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void testAddStateStoreWithDuplicates() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addStateStore(new MockStateStoreSupplier("store", false));
+        builder.addStateStore(new MockStateStoreSupplier("store", false));
+    }
+
+    @Test
+    public void testAddStateStore() {
+        final TopologyBuilder builder = new TopologyBuilder();
+        List<StateStoreSupplier> suppliers;
+
+        StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
+        builder.addStateStore(supplier);
+        suppliers = builder.build().stateStoreSuppliers();
+        assertEquals(0, suppliers.size());
+
+        builder.addSource("source-1", "topic-1");
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.connectProcessorAndStateStores("processor-1", "store-1");
+        suppliers = builder.build().stateStoreSuppliers();
+        assertEquals(1, suppliers.size());
+        assertEquals(supplier.name(), suppliers.get(0).name());
+    }
+
     @Test
     public void testTopicGroups() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -138,6 +188,35 @@ public class TopologyBuilderTest {
         assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
+    @Test
+    public void testTopicGroupsByStateStore() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1", "topic-1x");
+        builder.addSource("source-2", "topic-2");
+        builder.addSource("source-3", "topic-3");
+        builder.addSource("source-4", "topic-4");
+        builder.addSource("source-5", "topic-5");
+
+        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
+        builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
+
+        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
+        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
+        builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
+
+        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+        expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+        expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+        expectedTopicGroups.put(2, set("topic-5"));
+
+        assertEquals(3, topicGroups.size());
+        assertEquals(expectedTopicGroups, topicGroups);
+    }
+
     private <T> Set<T> set(T... items) {
         Set<T> set = new HashSet<>();
         for (T item : items) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index eb33dc3..c447f99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -24,14 +24,11 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
 import java.io.File;
@@ -53,45 +50,6 @@ import static org.junit.Assert.assertFalse;
 
 public class ProcessorStateManagerTest {
 
-    private static class MockStateStore implements StateStore {
-        private final String name;
-        private final boolean persistent;
-
-        public boolean flushed = false;
-        public boolean closed = false;
-        public final ArrayList<Integer> keys = new ArrayList<>();
-
-        public MockStateStore(String name, boolean persistent) {
-            this.name = name;
-            this.persistent = persistent;
-        }
-        @Override
-        public String name() {
-            return name;
-        }
-        @Override
-        public void flush() {
-            flushed = true;
-        }
-        @Override
-        public void close() {
-            closed = true;
-        }
-        @Override
-        public boolean persistent() {
-            return persistent;
-        }
-
-        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
-            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                keys.add(deserializer.deserialize("", key));
-            }
-        };
-    }
-
     private class MockRestoreConsumer  extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
@@ -255,7 +213,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
-            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
             try {
@@ -283,7 +241,7 @@ public class ProcessorStateManagerTest {
             ));
             restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
 
-            MockStateStore persistentStore = new MockStateStore("persistentStore", true); // persistent store
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
             try {
@@ -331,7 +289,7 @@ public class ProcessorStateManagerTest {
             ));
             restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
 
-            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false); // non persistent store
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
             try {
@@ -371,7 +329,7 @@ public class ProcessorStateManagerTest {
                     new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
-            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
             try {
@@ -411,8 +369,8 @@ public class ProcessorStateManagerTest {
             ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
             ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
 
-            MockStateStore persistentStore = new MockStateStore("persistentStore", true);
-            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false);
+            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
+            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 5f8ca46..803d4b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -83,7 +83,7 @@ public class ProcessorTopologyTest {
         }
         driver = null;
     }
-    
+
     @Test
     public void testTopologyMetadata() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -203,6 +203,10 @@ public class ProcessorTopologyTest {
     protected TopologyBuilder createStatefulTopology(String storeName) {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+                                    .addStateStore(
+                                            Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
+                                            "processor"
+                                    )
                                     .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
@@ -262,9 +266,10 @@ public class ProcessorTopologyTest {
         }
 
         @Override
+        @SuppressWarnings("unchecked")
         public void init(ProcessorContext context) {
             super.init(context);
-            store = Stores.create(storeName, context).withStringKeys().withStringValues().inMemory().build();
+            store = (KeyValueStore<String, String>) context.getStateStore(storeName);
         }
 
         @Override
@@ -281,7 +286,7 @@ public class ProcessorTopologyTest {
             }
             context().forward(Long.toString(streamTime), count);
         }
-        
+
         @Override
         public void close() {
             store.close();
@@ -303,4 +308,4 @@ public class ProcessorTopologyTest {
             return timestamp;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index b1403bd..2c7aaeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -30,7 +30,7 @@ public class PunctuationQueueTest {
     @Test
     public void testPunctuationInterval() {
         TestProcessor processor = new TestProcessor();
-        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor);
+        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
         PunctuationQueue queue = new PunctuationQueue();
 
         PunctuationSchedule sched = new PunctuationSchedule(node, 100L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0b828f7..d80e98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockSourceNode;
 import org.junit.Test;
@@ -37,6 +38,7 @@ import org.junit.Before;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Properties;
@@ -57,13 +59,15 @@ public class StreamTaskTest {
     private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final ProcessorTopology topology = new ProcessorTopology(
-        Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
-        new HashMap<String, SourceNode>() {
-            {
-                put("topic1", source1);
-                put("topic2", source2);
-            }
-        });
+            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+            new HashMap<String, SourceNode>() {
+                {
+                    put("topic1", source1);
+                    put("topic2", source2);
+                }
+            },
+            Collections.<StateStoreSupplier>emptyList()
+    );
 
     private StreamingConfig createConfig(final File baseDir) throws Exception {
         return new StreamingConfig(new Properties() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
index d8f06ea..209f3c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
@@ -21,20 +21,22 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
-    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(StreamingConfig config,
+                                                                      ProcessorContext context,
                                                                       Class<K> keyClass, Class<V> valueClass,
                                                                       boolean useContextSerdes);
-    
+
     @Test
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -100,7 +102,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void testPutGetRangeWithDefaultSerdes() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
         try {
 
             // Verify that the store reads and writes correctly ...
@@ -150,7 +152,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, false);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -176,7 +178,7 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.config(), driver.context(), Integer.class, String.class, true);
         try {
             // Verify that the store's contents were properly restored ...
             assertEquals(0, driver.checkForRestoredEntries(store));
@@ -188,4 +190,4 @@ public abstract class AbstractKeyValueStoreTest {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
index bee9967..b3fe98c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
@@ -18,21 +18,33 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
-                                                             boolean useContextSerdes) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            StreamingConfig config,
+            ProcessorContext context,
+            Class<K> keyClass, Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
         if (useContextSerdes) {
             Serializer<K> keySer = (Serializer<K>) context.keySerializer();
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+        } else {
+            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).inMemory().build();
         }
-        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).inMemory().build();
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
index 6b96d3a..dddb9c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertNull;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.junit.Test;
 
 public class InMemoryLRUCacheStoreTest {
@@ -29,10 +30,12 @@ public class InMemoryLRUCacheStoreTest {
     public void testPutGetRange() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
         store.put(0, "zero");
@@ -79,11 +82,13 @@ public class InMemoryLRUCacheStoreTest {
         Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
         Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
         Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withKeys(keySer, keyDeser)
                                                      .withValues(valSer, valDeser)
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
         store.put(0, "zero");
@@ -133,10 +138,12 @@ public class InMemoryLRUCacheStoreTest {
 
         // Create the store, which should register with the context and automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+        StateStoreSupplier supplier = Stores.create("my-store", driver.config())
                                                      .withIntegerKeys().withStringValues()
                                                      .inMemory().maxEntries(3)
                                                      .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
 
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
@@ -145,4 +152,4 @@ public class InMemoryLRUCacheStoreTest {
         assertEquals(3, driver.sizeOf(store));
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 7e1512a..8bab1c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -30,6 +31,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.MockTimestampExtractor;
 
 import java.io.File;
 import java.util.HashMap;
@@ -38,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -218,6 +221,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+    private final StreamingConfig config;
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
     private final StreamingMetrics metrics = new StreamingMetrics() {
@@ -243,6 +247,15 @@ public class KeyValueStoreTestDriver<K, V> {
                 recordFlushed(record.key(), record.value());
             }
         };
+        Properties props = new Properties();
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+        this.config = new StreamingConfig(props);
+
         this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
                 serdes.valueDeserializer(), recordCollector) {
             @Override
@@ -349,6 +362,15 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
+     * Get the streaming config that should be supplied to a {@link Serdes}'s constructor.
+     *
+     * @return the streaming config; never null
+     */
+    public StreamingConfig config() {
+        return config;
+    }
+
+    /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
      * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
      * {@link #flushedEntryRemoved(Object)} methods.

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
index 9ac1740..37a12f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
@@ -18,21 +18,35 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
     @Override
-    protected <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass,
-                                                             boolean useContextSerdes) {
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            StreamingConfig config,
+            ProcessorContext context,
+            Class<K> keyClass,
+            Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
         if (useContextSerdes) {
             Serializer<K> keySer = (Serializer<K>) context.keySerializer();
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            return Stores.create("my-store", context).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+            supplier = Stores.create("my-store", config).withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+        } else {
+            supplier = Stores.create("my-store", config).withKeys(keyClass).withValues(valueClass).localDatabase().build();
         }
-        return Stores.create("my-store", context).withKeys(keyClass).withValues(valueClass).localDatabase().build();
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
new file mode 100644
index 0000000..16635b7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+import java.util.ArrayList;
+
+public class MockStateStoreSupplier implements StateStoreSupplier {
+    private final String name;
+    private final boolean persistent;
+
+    public MockStateStoreSupplier(String name, boolean persistent) {
+        this.name = name;
+        this.persistent = persistent;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public StateStore get() {
+        return new MockStateStore(name, persistent);
+    }
+
+    public static class MockStateStore implements StateStore {
+        private final String name;
+        private final boolean persistent;
+
+        public boolean initialized = false;
+        public boolean flushed = false;
+        public boolean closed = false;
+        public final ArrayList<Integer> keys = new ArrayList<>();
+
+        public MockStateStore(String name, boolean persistent) {
+            this.name = name;
+            this.persistent = persistent;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            context.register(this, stateRestoreCallback);
+            initialized = true;
+        }
+
+        @Override
+        public void flush() {
+            flushed = true;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        @Override
+        public boolean persistent() {
+            return persistent;
+        }
+
+        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
+            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                keys.add(deserializer.deserialize("", key));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/75827226/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 0c4b1a2..fc83762 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -266,7 +267,7 @@ public class ProcessorTopologyTestDriver {
      * @see #getKeyValueStore(String)
      */
     public StateStore getStateStore(String name) {
-        return task.context().getStateStore(name);
+        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
     }
 
     /**


Mime
View raw message