kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3911: KTable source materialization
Date Thu, 21 Jul 2016 21:45:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 933a7506e -> 1ccab26a3


KAFKA-3911: KTable source materialization

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #1638 from enothereska/KAFKA-3911-ktable-materialization


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ccab26a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ccab26a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ccab26a

Branch: refs/heads/trunk
Commit: 1ccab26a325e6ee23396049a24a5b6eb4b7a1c8e
Parents: 933a750
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Jul 21 14:44:59 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jul 21 14:44:59 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 14 +++++++----
 .../streams/kstream/internals/KTableImpl.java   | 11 ++++++---
 .../streams/processor/TopologyBuilder.java      | 16 ++++++++++++-
 .../processor/internals/AbstractTask.java       |  3 ++-
 .../internals/ProcessorStateManager.java        | 25 ++++++++++++++------
 .../processor/internals/ProcessorTopology.java  | 19 +++++++--------
 .../kstream/internals/KTableFilterTest.java     |  2 +-
 .../kstream/internals/KTableForeachTest.java    | 13 +++++++++-
 .../kstream/internals/KTableImplTest.java       |  6 ++---
 .../kstream/internals/KTableMapKeysTest.java    | 12 +++++++++-
 .../kstream/internals/KTableSourceTest.java     |  2 +-
 .../internals/ProcessorStateManagerTest.java    | 12 +++++-----
 .../processor/internals/StandbyTaskTest.java    | 10 ++++++--
 .../processor/internals/StreamTaskTest.java     |  3 ++-
 14 files changed, 104 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 2df1bcb..08e9842 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -144,14 +144,18 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V>
valSerde, String topic, final String storeName) {
-        String source = newName(KStreamImpl.SOURCE_NAME);
-        String name = newName(KTableImpl.SOURCE_NAME);
+        final String source = newName(KStreamImpl.SOURCE_NAME);
+        final String name = newName(KTableImpl.SOURCE_NAME);
+        final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
 
         addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde ==
null ? null : valSerde.deserializer(), topic);
-
-        ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
         addProcessor(name, processorSupplier, source);
-        return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source),
keySerde, valSerde, storeName);
+
+        final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source),
keySerde, valSerde, storeName);
+        kTable.materialize((KTableSource) processorSupplier);
+        connectSourceStoreAndTopic(storeName, topic);
+
+        return kTable;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 17f4716..f4d4855 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -364,7 +365,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
             KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
-            materialize(source);
+            if (!source.isMaterialized()) {
+                throw new StreamsException("Source is not materialized");
+            }
             return new KTableSourceValueGetterSupplier<>(source.storeName);
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
@@ -378,7 +381,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
-                materialize(source);
+                if (!source.isMaterialized()) {
+                    throw new StreamsException("Source is not materialized");
+                }
                 source.enableSendingOldValues();
             } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
                 ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
@@ -393,7 +398,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         return sendOldValues;
     }
 
-    private void materialize(KTableSource<K, ?> source) {
+    public void materialize(KTableSource<K, ?> source) {
         synchronized (source) {
             if (!source.isMaterialized()) {
                 StateStoreSupplier storeSupplier =

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a28b270..b8851b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -67,6 +67,7 @@ public class TopologyBuilder {
     private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
     private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+    private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
     private String applicationId;
 
@@ -536,6 +537,19 @@ public class TopologyBuilder {
         return this;
     }
 
+    protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName,
String topic) {
+        if (sourceStoreToSourceTopic != null) {
+            if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
+                throw new TopologyBuilderException("Source store " + sourceStoreName + "
is already added.");
+            }
+            sourceStoreToSourceTopic.put(sourceStoreName, topic);
+        } else {
+            throw new TopologyBuilderException("sourceStoreToSourceTopic is null");
+        }
+
+        return this;
+    }
+
     /**
      * Connects a list of processors.
      *
@@ -841,7 +855,7 @@ public class TopologyBuilder {
             }
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()));
+        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()),
sourceStoreToSourceTopic);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 70070a9..8a45dd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -61,7 +61,8 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer,
isStandby, stateDirectory);
+            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer,
isStandby, stateDirectory, topology.sourceStoreToSourceTopic());
+
         } catch (IOException e) {
             throw new ProcessorStateException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 883959e..11c61a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -61,13 +61,15 @@ public class ProcessorStateManager {
     private final Map<TopicPartition, Long> offsetLimits;
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby
tasks, keyed by state topic name
+    private final Map<String, String> sourceStoreToSourceTopic;
     private final TaskId taskId;
     private final StateDirectory stateDirectory;
 
     /**
      * @throws IOException if any error happens while creating or locking the state directory
      */
-    public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition>
sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, StateDirectory
stateDirectory) throws IOException {
+    public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition>
sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby,
+        StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic)
throws IOException {
         this.applicationId = applicationId;
         this.defaultPartition = taskId.partition;
         this.taskId = taskId;
@@ -84,6 +86,7 @@ public class ProcessorStateManager {
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>()
: null;
         this.offsetLimits = new HashMap<>();
         this.baseDir  = stateDirectory.directoryForTask(taskId);
+        this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
 
         if (!stateDirectory.lock(taskId, 5)) {
             throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
@@ -112,20 +115,28 @@ public class ProcessorStateManager {
      * @throws StreamsException if the store's change log does not contain the partition
      */
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback)
{
-        if (store.name().equals(CHECKPOINT_FILE_NAME))
+
+        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
             throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
+        }
 
-        if (this.stores.containsKey(store.name()))
+        if (this.stores.containsKey(store.name())) {
             throw new IllegalArgumentException("Store " + store.name() + " has already been
registered.");
+        }
 
-        if (loggingEnabled)
+        if (loggingEnabled) {
             this.loggingEnabled.add(store.name());
-
+        }
+        
         // check that the underlying change log topic exist or not
         String topic;
-        if (loggingEnabled)
+        if (loggingEnabled) {
             topic = storeChangelogTopic(this.applicationId, store.name());
-        else topic = store.name();
+        } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name()))
{
+            topic = sourceStoreToSourceTopic.get(store.name());
+        } else {
+            throw new IllegalArgumentException("Store is neither built from source topic,
nor has a changelog.");
+        }
 
         // block until the partition is ready for this state changelog topic or time has
elapsed
         int partition = getPartition(topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 0316446..221d152 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Set;
 public class ProcessorTopology {
 
@@ -30,23 +29,17 @@ public class ProcessorTopology {
     private final Map<String, SourceNode> sourceByTopics;
     private final Map<String, SinkNode> sinkByTopics;
     private final List<StateStoreSupplier> stateStoreSuppliers;
-    private final Map<String, String> sinkNameToTopic;
-
+    private final Map<String, String> sourceStoreToSourceTopic;
     public ProcessorTopology(List<ProcessorNode> processorNodes,
                              Map<String, SourceNode> sourceByTopics,
                              Map<String, SinkNode> sinkByTopics,
-                             List<StateStoreSupplier> stateStoreSuppliers) {
+                             List<StateStoreSupplier> stateStoreSuppliers,
+                             Map<String, String> sourceStoreToSourceTopic) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
         this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
         this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
-
-        // pre-process sink nodes to get reverse mapping
-        sinkNameToTopic = new HashMap<>();
-        for (String topic : sinkByTopics.keySet()) {
-            SinkNode sink = sinkByTopics.get(topic);
-            sinkNameToTopic.put(sink.name(), topic);
-        }
+        this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
     }
 
     public Set<String> sourceTopics() {
@@ -81,6 +74,10 @@ public class ProcessorTopology {
         return stateStoreSuppliers;
     }
 
+    public Map<String, String> sourceStoreToSourceTopic() {
+        return sourceStoreToSourceTopic;
+    }
+
     private String childrenToString(List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d8dee30..6837c56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -84,7 +84,7 @@ public class KTableFilterTest {
         table2.toStream().process(proc2);
         table3.toStream().process(proc3);
 
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index e0cb190..6fbce82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -24,23 +24,34 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import java.util.List;
 import java.util.Locale;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.io.File;
+import java.io.IOException;
+
 
 import static org.junit.Assert.assertEquals;
 
 public class KTableForeachTest {
 
     final private String topicName = "topic";
+    private File stateDir = null;
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
 
     private KStreamTestDriver driver;
 
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
+
     @After
     public void cleanup() {
         if (driver != null) {
@@ -81,7 +92,7 @@ public class KTableForeachTest {
         table.foreach(action);
 
         // Then
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
         for (KeyValue<Integer, String> record: inputRecords) {
             driver.process(topicName, record.key, record.value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6794bb4..617a2a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -105,7 +105,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
 
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "01");
         driver.process(topic1, "B", "02");
@@ -273,8 +273,8 @@ public class KTableImplTest {
         driver = new KStreamTestDriver(builder, stateDir, null, null);
         driver.setTime(0L);
 
-        // no state stores should be created
-        assertEquals(0, driver.allStateStores().size());
+        // two state stores should be created
+        assertEquals(2, driver.allStateStores().size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 7666438..78cff18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -26,10 +26,14 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,6 +43,7 @@ public class KTableMapKeysTest {
 
     final private Serde<String> stringSerde = new Serdes.StringSerde();
     final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
+    private File stateDir = null;
     private KStreamTestDriver driver = null;
 
 
@@ -50,6 +55,11 @@ public class KTableMapKeysTest {
         driver = null;
     }
 
+    @Before
+     public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
+
     @Test
     public void testMapKeysConvertingToStream() {
         final KStreamBuilder builder = new KStreamBuilder();
@@ -82,7 +92,7 @@ public class KTableMapKeysTest {
 
         convertedStream.process(processor);
 
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
 
         for (int i = 0;  i < originalKeys.length; i++) {
             driver.process(topic1, originalKeys[i], values[i]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index ad3f02c..cd1262b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -68,7 +68,7 @@ public class KTableSourceTest {
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
-        driver = new KStreamTestDriver(builder);
+        driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", 1);
         driver.process(topic1, "B", 2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fbc92d2..32dce6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -209,7 +209,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, new MockRestoreConsumer(), false, stateDirectory);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null);
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
         } finally {
@@ -237,7 +237,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore",
true); // persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory, null);
         try {
             restoreConsumer.reset();
 
@@ -286,7 +286,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false); // non persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
2), noPartitions, restoreConsumer, false, stateDirectory);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
2), noPartitions, restoreConsumer, false, stateDirectory, null);
         try {
             restoreConsumer.reset();
 
@@ -359,7 +359,7 @@ public class ProcessorStateManagerTest {
         // if there is an source partition, inherit the partition id
         Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3,
1));
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
sourcePartitions, restoreConsumer, true, stateDirectory); // standby
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
sourcePartitions, restoreConsumer, true, stateDirectory, null); // standby
         try {
             restoreConsumer.reset();
 
@@ -393,7 +393,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, restoreConsumer, false, stateDirectory);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, restoreConsumer, false, stateDirectory, null);
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -431,7 +431,7 @@ public class ProcessorStateManagerTest {
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName,
true);
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory);
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory, null);
         try {
             // make sure the checkpoint file is deleted
             assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f3339a8..11058c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -77,7 +77,8 @@ public class StandbyTaskTest {
             Utils.<StateStoreSupplier>mkList(
                     new MockStateStoreSupplier(storeName1, false),
                     new MockStateStoreSupplier(storeName2, true)
-            )
+            ),
+            Collections.<String, String>emptyMap()
     );
 
     private final TopicPartition ktable = new TopicPartition("ktable1", 0);
@@ -88,7 +89,12 @@ public class StandbyTaskTest {
             Collections.<String, SinkNode>emptyMap(),
             Utils.<StateStoreSupplier>mkList(
                     new MockStateStoreSupplier(ktable.topic(), true, false)
-            )
+            ),
+            new HashMap<String, String>() {
+            {
+                put("ktable1", ktable.topic());
+            }
+        }
     );
     private File baseDir;
     private StateDirectory stateDirectory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ccab26a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index fdcf6b8..32d6aa4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -75,7 +75,8 @@ public class StreamTaskTest {
                 }
             },
             Collections.<String, SinkNode>emptyMap(),
-            Collections.<StateStoreSupplier>emptyList()
+            Collections.<StateStoreSupplier>emptyList(),
+            Collections.<String, String>emptyMap()
     );
     private File baseDir;
     private StateDirectory stateDirectory;


Mime
View raw message