kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Remove unnecessary store info from TopologyBuilder
Date Fri, 13 Jan 2017 19:28:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fc0d612b0 -> e90db3e3a


MINOR: Remove unnecessary store info from TopologyBuilder

This PR is extracted from https://github.com/apache/kafka/pull/2333 as an incremental fix to ease the reviewing:

1. Removed `storeToProcessorNodeMap` from ProcessorTopology since it was previously used to set the context current record, and can now be replaced with the dirty entry in the named cache.

2. Replaced `sourceStoreToSourceTopic` from ProcessorTopology with `storeToChangelogTopic` map, which includes the corresponding changelog topic name for all stores that are changelog enabled.

3. Modified `ProcessorStateManager` to rely on `sourceStoreToSourceTopic` when retrieving the changelog topic; this makes the second parameter `loggingEnabled` in `register` not needed any more, and we can deprecate the old API with a new one.

4. Also fixed a minor issue in `KStreamBuilder`: if the storeName is not provided in the `table(..)` function, do not create the underlying materialized store. Modified the unit tests to cover this case.

5. Fixed a bunch of other unit tests failures that are exposed by this refactoring, in which we are not setting the applicationId correctly when constructing the mocking processor topology.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Ewen Cheslack-Postava

Closes #2338 from guozhangwang/KMinor-refactor-state-to-changelogtopic


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e90db3e3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e90db3e3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e90db3e3

Branch: refs/heads/trunk
Commit: e90db3e3af0718a80d729809186d45076e71a241
Parents: fc0d612
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Jan 13 11:28:40 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jan 13 11:28:40 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 24 +++---
 .../streams/processor/TopologyBuilder.java      | 38 +++++----
 .../processor/internals/AbstractTask.java       |  2 +-
 .../internals/GlobalStateManagerImpl.java       |  2 +-
 .../internals/GlobalStateUpdateTask.java        |  2 +-
 .../internals/ProcessorStateManager.java        | 85 ++++++++------------
 .../processor/internals/ProcessorTopology.java  | 26 +++---
 .../streams/kstream/KStreamBuilderTest.java     | 51 ++++--------
 .../streams/processor/TopologyBuilderTest.java  |  1 -
 .../processor/internals/AbstractTaskTest.java   |  1 -
 .../internals/GlobalStateManagerImplTest.java   |  1 -
 .../internals/GlobalStateTaskTest.java          |  1 -
 .../internals/ProcessorStateManagerTest.java    | 52 ++++++++----
 .../processor/internals/StandbyTaskTest.java    | 19 +++--
 .../processor/internals/StreamTaskTest.java     |  7 +-
 .../processor/internals/StreamThreadTest.java   | 10 +--
 .../apache/kafka/test/KStreamTestDriver.java    | 15 +---
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 18 files changed, 160 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index c1a74e7..aecd8ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -261,16 +261,20 @@ public class KStreamBuilder extends TopologyBuilder {
         addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
         addProcessor(name, processorSupplier, source);
 
-        final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);
-        StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
-            keySerde,
-            valSerde,
-            false,
-            Collections.<String, String>emptyMap(),
-            true);
-
-        addStateStore(storeSupplier, name);
-        connectSourceStoreAndTopic(storeName, topic);
+        final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);
+
+        // only materialize the KTable into a state store if the storeName is not null
+        if (storeName != null) {
+            StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
+                    keySerde,
+                    valSerde,
+                    false,
+                    Collections.<String, String>emptyMap(),
+                    true);
+
+            addStateStore(storeSupplier, name);
+            connectSourceStoreAndTopic(storeName, topic);
+        }
 
         return kTable;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index e8f0994..b25fcad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -97,9 +97,9 @@ public class TopologyBuilder {
     // are connected to these state stores
     private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
 
-    // map from state store names that are directly associated with source processors to their subscribed topics,
+    // map from state store names to this state store's corresponding changelog topic if possible,
     // this is used in the extended KStreamBuilder.
-    private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
+    private final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
     // all global topics
     private final Set<String> globalTopics = new HashSet<>();
@@ -784,11 +784,15 @@ public class TopologyBuilder {
         return this;
     }
 
+    /**
+     * This is used only for KStreamBuilder: when adding a KTable from a source topic,
+     * we need to add the topic as the KTable's materialized state store's changelog.
+     */
     protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) {
-        if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
+        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
         }
-        sourceStoreToSourceTopic.put(sourceStoreName, topic);
+        storeToChangelogTopic.put(sourceStoreName, topic);
         return this;
     }
 
@@ -1026,7 +1030,6 @@ public class TopologyBuilder {
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
         Map<String, SinkNode> topicSinkMap = new HashMap<>();
         Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
-        Map<StateStore, ProcessorNode> storeToProcessorNodeMap = new HashMap<>();
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
         for (NodeFactory factory : nodeFactories.values()) {
@@ -1041,9 +1044,22 @@ public class TopologyBuilder {
                     }
                     for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
                         if (!stateStoreMap.containsKey(stateStoreName)) {
-                            final StateStore stateStore = getStateStore(stateStoreName);
+                            StateStore stateStore;
+
+                            if (stateFactories.containsKey(stateStoreName)) {
+                                final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
+                                stateStore = supplier.get();
+
+                                // remember the changelog topic if this state store is change-logging enabled
+                                if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
+                                    final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
+                                    storeToChangelogTopic.put(stateStoreName, changelogTopic);
+                                }
+                            } else {
+                                stateStore = globalStateStores.get(stateStoreName);
+                            }
+
                             stateStoreMap.put(stateStoreName, stateStore);
-                            storeToProcessorNodeMap.put(stateStore, node);
                         }
                     }
                 } else if (factory instanceof SourceNodeFactory) {
@@ -1077,13 +1093,7 @@ public class TopologyBuilder {
             }
         }
 
-        return new ProcessorTopology(processorNodes,
-                                     topicSourceMap,
-                                     topicSinkMap,
-                                     new ArrayList<>(stateStoreMap.values()),
-                                     sourceStoreToSourceTopic,
-                                     storeToProcessorNodeMap,
-                                     new ArrayList<>(globalStateStores.values()));
+        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index bed3311..0730c68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -70,7 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap());
+            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
 
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 927f62b..7534993 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -144,7 +144,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     }
 
     private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
-        final String sourceTopic = topology.sourceStoreToSourceTopic().get(store.name());
+        final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
         final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic);
         if (partitionInfos == null || partitionInfos.isEmpty()) {
             throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 9723f3c..40f2a3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -59,7 +59,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     @SuppressWarnings("unchecked")
     public Map<TopicPartition, Long> initialize() {
         final Set<String> storeNames = stateMgr.initialize(processorContext);
-        final Map<String, String> storeNameToTopic = topology.sourceStoreToSourceTopic();
+        final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
             final SourceNode source = topology.source(sourceTopic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index dca8192..a21c3e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -38,11 +38,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static java.util.Collections.singleton;
 
@@ -53,52 +51,51 @@ public class ProcessorStateManager implements StateManager {
     public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
-    private final String logPrefix;
-    private final String applicationId;
-    private final int defaultPartition;
-    private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
+    private final TaskId taskId;
+    private final String logPrefix;
+    private final boolean isStandby;
+    private final StateDirectory stateDirectory;
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
-    private final Set<String> loggingEnabled;
     private final Consumer<byte[], byte[]> restoreConsumer;
+    private final Map<TopicPartition, Long> offsetLimits;
     private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
-    private final Map<TopicPartition, Long> offsetLimits;
-    private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
-    private final Map<String, String> sourceStoreToSourceTopic;
-    private final TaskId taskId;
-    private final StateDirectory stateDirectory;
-    private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap;
+    private final Map<String, String> storeToChangelogTopic;
+
+    // TODO: this map does not work with customized grouper where multiple partitions
+    // of the same topic can be assigned to the same topic.
+    private final Map<String, TopicPartition> partitionForTopic;
 
     /**
      * @throws LockException if the state directory cannot be locked because another thread holds the lock
      *                       (this might be recoverable by retrying)
      * @throws IOException if any severe error happens while creating or locking the state directory
      */
-    public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby,
-                                 StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic,
-                                 final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap) throws LockException, IOException {
-        this.applicationId = applicationId;
-        this.defaultPartition = taskId.partition;
+    public ProcessorStateManager(final String applicationId,
+                                 final TaskId taskId,
+                                 final Collection<TopicPartition> sources,
+                                 final Consumer<byte[], byte[]> restoreConsumer,
+                                 final boolean isStandby,
+                                 final StateDirectory stateDirectory,
+                                 final Map<String, String> storeToChangelogTopic) throws LockException, IOException {
         this.taskId = taskId;
         this.stateDirectory = stateDirectory;
-        this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap;
+        this.baseDir  = stateDirectory.directoryForTask(taskId);
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
             this.partitionForTopic.put(source.topic(), source);
         }
         this.stores = new LinkedHashMap<>();
         this.globalStores = new HashMap<>();
-        this.loggingEnabled = new HashSet<>();
         this.restoreConsumer = restoreConsumer;
+        this.offsetLimits = new HashMap<>();
         this.restoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
-        this.offsetLimits = new HashMap<>();
-        this.baseDir  = stateDirectory.directoryForTask(taskId);
-        this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
+        this.storeToChangelogTopic = storeToChangelogTopic;
 
         this.logPrefix = String.format("task [%s]", taskId);
 
@@ -126,6 +123,9 @@ public class ProcessorStateManager implements StateManager {
     /**
      * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
      * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
+     *
+     * // TODO: parameter loggingEnabled can be removed now
+     *
      * @throws StreamsException if the store's change log does not contain the partition
      */
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
@@ -139,17 +139,8 @@ public class ProcessorStateManager implements StateManager {
             throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name()));
         }
 
-        if (loggingEnabled) {
-            this.loggingEnabled.add(store.name());
-        }
-        
         // check that the underlying change log topic exist or not
-        String topic = null;
-        if (loggingEnabled) {
-            topic = storeChangelogTopic(this.applicationId, store.name());
-        } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) {
-            topic = sourceStoreToSourceTopic.get(store.name());
-        }
+        String topic = storeToChangelogTopic.get(store.name());
 
         if (topic == null) {
             this.stores.put(store.name(), store);
@@ -283,7 +274,6 @@ public class ProcessorStateManager implements StateManager {
         List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
 
         // restore states from changelog records
-
         StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
 
         long lastOffset = -1L;
@@ -304,6 +294,7 @@ public class ProcessorStateManager implements StateManager {
             }
             count++;
         }
+
         // record the restored offset for its change log partition
         restoredOffsets.put(storePartition, lastOffset + 1);
 
@@ -328,10 +319,6 @@ public class ProcessorStateManager implements StateManager {
         if (!this.stores.isEmpty()) {
             log.debug("{} Flushing all stores registered in the state manager", logPrefix);
             for (StateStore store : this.stores.values()) {
-                final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store);
-                if (processorNode != null) {
-                    context.setCurrentNode(processorNode);
-                }
                 try {
                     log.trace("{} Flushing store={}", logPrefix, store.name());
                     store.flush();
@@ -364,24 +351,22 @@ public class ProcessorStateManager implements StateManager {
                 if (ackedOffsets != null) {
                     Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
                     for (String storeName : stores.keySet()) {
-                        TopicPartition part;
-                        if (loggingEnabled.contains(storeName))
-                            part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
-                        else
-                            part = new TopicPartition(storeName, getPartition(storeName));
+                        // only checkpoint the offset to the offsets file if
+                        // it is persistent AND changelog enabled
+                        if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
+                            String changelogTopic = storeToChangelogTopic.get(storeName);
+                            TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
 
-                        // only checkpoint the offset to the offsets file if it is persistent;
-                        if (stores.get(storeName).persistent()) {
-                            Long offset = ackedOffsets.get(part);
+                            Long offset = ackedOffsets.get(topicPartition);
 
                             if (offset != null) {
                                 // store the last offset + 1 (the log position after restoration)
-                                checkpointOffsets.put(part, offset + 1);
+                                checkpointOffsets.put(topicPartition, offset + 1);
                             } else {
                                 // if no record was produced. we need to check the restored offset.
-                                offset = restoredOffsets.get(part);
+                                offset = restoredOffsets.get(topicPartition);
                                 if (offset != null)
-                                    checkpointOffsets.put(part, offset);
+                                    checkpointOffsets.put(topicPartition, offset);
                             }
                         }
                     }
@@ -400,7 +385,7 @@ public class ProcessorStateManager implements StateManager {
     private int getPartition(String topic) {
         TopicPartition partition = partitionForTopic.get(topic);
 
-        return partition == null ? defaultPartition : partition.partition();
+        return partition == null ? taskId.partition : partition.partition();
     }
 
     void registerGlobalStateStores(final List<StateStore> stateStores) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 1eff351..10042fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,6 +16,7 @@
  */
 
 package org.apache.kafka.streams.processor.internals;
+
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Collections;
@@ -23,29 +24,27 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 public class ProcessorTopology {
 
     private final List<ProcessorNode> processorNodes;
-    private final Map<String, SourceNode> sourceByTopics;
-    private final Map<String, SinkNode> sinkByTopics;
     private final List<StateStore> stateStores;
-    private final Map<String, String> sourceStoreToSourceTopic;
-    private final Map<StateStore, ProcessorNode> storeToProcessorNodeMap;
     private final List<StateStore> globalStateStores;
+    private final Map<String, SourceNode> sourceByTopics;
+    private final Map<String, SinkNode> sinkByTopics;
+    private final Map<String, String> storeToChangelogTopic;
 
     public ProcessorTopology(final List<ProcessorNode> processorNodes,
                              final Map<String, SourceNode> sourceByTopics,
                              final Map<String, SinkNode> sinkByTopics,
                              final List<StateStore> stateStores,
-                             final Map<String, String> sourceStoreToSourceTopic,
-                             final Map<StateStore, ProcessorNode> storeToProcessorNodeMap,
+                             final Map<String, String> storeToChangelogTopic,
                              final List<StateStore> globalStateStores) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
         this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
         this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
-        this.stateStores = Collections.unmodifiableList(stateStores);
-        this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
-        this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap);
+        this.stateStores    = Collections.unmodifiableList(stateStores);
+        this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
         this.globalStateStores = Collections.unmodifiableList(globalStateStores);
     }
 
@@ -81,15 +80,10 @@ public class ProcessorTopology {
         return stateStores;
     }
 
-    public Map<String, String> sourceStoreToSourceTopic() {
-        return sourceStoreToSourceTopic;
-    }
-
-    public Map<StateStore, ProcessorNode> storeToProcessorNodeMap() {
-        return storeToProcessorNodeMap;
+    public Map<String, String> storeToChangelogTopic() {
+        return storeToChangelogTopic;
     }
 
-
     public List<StateStore> globalStateStores() {
         return globalStateStores;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 91b2127..c32082c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
@@ -110,20 +109,6 @@ public class KStreamBuilderTest {
         final KStream<String, String> source1 = builder.stream(topic1);
         final KStream<String, String> source2 = builder.stream(topic2);
         final KStream<String, String> source3 = builder.stream(topic3);
-
-        final KStream<String, String> merged = builder.merge(source1, source2, source3);
-        merged.groupByKey().count("my-table");
-        final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
-        assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
-    }
-
-    @Test
-    public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception {
-        final String topic1 = "topic-1";
-        final String topic2 = "topic-2";
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KStream<String, String> source1 = builder.stream(topic1);
-        final KStream<String, String> source2 = builder.stream(topic2);
         final KStream<String, String> processedSource1 =
                 source1.mapValues(new ValueMapper<String, String>() {
                     @Override
@@ -143,10 +128,10 @@ public class KStreamBuilderTest {
             }
         });
 
-        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2);
+        final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
         merged.groupByKey().count("my-table");
         final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
-        assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table"));
+        assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
     }
 
     @Test(expected = TopologyBuilderException.class)
@@ -160,21 +145,19 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldNotGroupGlobalTableWithOtherStreams() throws Exception {
+    public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        final GlobalKTable<String, String> globalTable = builder.globalTable("table", "globalTable");
-        final KStream<String, String> stream = builder.stream("t1");
-        final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
-            @Override
-            public String apply(final String key, final String value) {
-                return value;
-            }
-        };
-        stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
-        builder.stream("t2");
         builder.setApplicationId("app-id");
-        final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
-        assertEquals(Utils.mkSet("KTABLE-SOURCE-0000000001", "KSTREAM-SOURCE-0000000000"), nodeGroups.get(0));
+
+        builder.table("topic1", "table1");
+        builder.table("topic2", null);
+
+        ProcessorTopology topology = builder.build(null);
+
+        assertEquals(1, topology.stateStores().size());
+        assertEquals("table1", topology.stateStores().get(0).name());
+        assertEquals(1, topology.storeToChangelogTopic().size());
+        assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
     }
 
     @Test
@@ -182,11 +165,10 @@ public class KStreamBuilderTest {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.globalTable("table", "globalTable");
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
-        final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap = topology.storeToProcessorNodeMap();
-        assertEquals(1, stateStoreProcessorNodeMap.size());
-        final StateStore store = stateStoreProcessorNodeMap.keySet().iterator().next();
+        final List<StateStore> stateStores = topology.globalStateStores();
+        final StateStore store = stateStores.iterator().next();
+        assertEquals(1, stateStores.size());
         assertEquals("globalTable", store.name());
-        assertEquals("KTABLE-SOURCE-0000000001", stateStoreProcessorNodeMap.get(store).name());
     }
 
     @Test
@@ -251,5 +233,4 @@ public class KStreamBuilderTest {
         assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
         assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index b545005..4712320 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -547,7 +547,6 @@ public class TopologyBuilderTest {
 
             final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME);
             driver.process("topic", null, null);
-
         } catch (final StreamsException e) {
             final Throwable cause = e.getCause();
             if (cause != null

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 9816b6b..16967bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -67,7 +67,6 @@ public class AbstractTaskTest {
                                                       Collections.<String, SinkNode>emptyMap(),
                                                       Collections.<StateStore>emptyList(),
                                                       Collections.<String, String>emptyMap(),
-                                                      Collections.<StateStore, ProcessorNode>emptyMap(),
                                                       Collections.<StateStore>emptyList()),
                                 consumer,
                                 consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 2d10c13..db51cef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -85,7 +85,6 @@ public class GlobalStateManagerImplTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  storeToTopic,
-                                                                 storeToProcessorNode,
                                                                  Arrays.<StateStore>asList(store1, store2));
 
         context = new NoOpProcessorContext();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index afb8e76..df0b73c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -74,7 +74,6 @@ public class GlobalStateTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  storeToTopic,
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
         context = new NoOpProcessorContext();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 7023712..de54723 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -64,11 +63,11 @@ public class ProcessorStateManagerTest {
     public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
-        public TopicPartition assignedPartition = null;
-        public TopicPartition seekPartition = null;
-        public long seekOffset = -1L;
-        public boolean seekToBeginingCalled = false;
-        public boolean seekToEndCalled = false;
+        private TopicPartition assignedPartition = null;
+        private TopicPartition seekPartition = null;
+        private long seekOffset = -1L;
+        private boolean seekToBeginingCalled = false;
+        private boolean seekToEndCalled = false;
         private long endOffset = 0L;
         private long currentOffset = 0L;
 
@@ -193,7 +192,6 @@ public class ProcessorStateManagerTest {
 
     private final Set<TopicPartition> noPartitions = Collections.emptySet();
     private final String applicationId = "test-application";
-    private final String stateDir = "test";
     private final String persistentStoreName = "persistentStore";
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
@@ -214,7 +212,11 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() {
+            {
+                put(nonPersistentStoreName, nonPersistentStoreName);
+            }
+        });
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
         } finally {
@@ -242,7 +244,12 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
+            {
+                put(persistentStoreName, persistentStoreTopicName);
+                put(nonPersistentStoreName, nonPersistentStoreName);
+            }
+        });
         try {
             restoreConsumer.reset();
 
@@ -291,7 +298,12 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
+            {
+                put(persistentStoreName, persistentStoreTopicName);
+                put(nonPersistentStoreName, nonPersistentStoreTopicName);
+            }
+        });
         try {
             restoreConsumer.reset();
 
@@ -332,6 +344,11 @@ public class ProcessorStateManagerTest {
         String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
         String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
 
+        Map<String, String> storeToChangelogTopic = new HashMap<>();
+        storeToChangelogTopic.put(storeName1, storeTopicName1);
+        storeToChangelogTopic.put(storeName2, storeTopicName2);
+        storeToChangelogTopic.put(storeName3, storeTopicName3);
+
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
@@ -364,7 +381,7 @@ public class ProcessorStateManagerTest {
         // if there is an source partition, inherit the partition id
         Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap()); // standby
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
         try {
             restoreConsumer.reset();
 
@@ -398,7 +415,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -436,7 +453,12 @@ public class ProcessorStateManagerTest {
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
+            {
+                put(persistentStoreName, persistentStoreTopicName);
+                put(nonPersistentStoreName, nonPersistentStoreTopicName);
+            }
+        });
         try {
             // make sure the checkpoint file is deleted
             assertFalse(checkpointFile.exists());
@@ -469,7 +491,7 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
         stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -490,7 +512,7 @@ public class ProcessorStateManagerTest {
 
 
         final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.<StateStore, ProcessorNode>emptyMap());
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index fdf0115..2d32e78 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -80,8 +80,12 @@ public class StandbyTaskTest {
                     new MockStateStoreSupplier(storeName1, false).get(),
                     new MockStateStoreSupplier(storeName2, true).get()
             ),
-            Collections.<String, String>emptyMap(),
-            Collections.<StateStore, ProcessorNode>emptyMap(),
+            new HashMap<String, String>() {
+                {
+                    put(storeName1, storeChangelogTopicName1);
+                    put(storeName2, storeChangelogTopicName2);
+                }
+            },
             Collections.<StateStore>emptyList());
 
     private final TopicPartition ktable = new TopicPartition("ktable1", 0);
@@ -94,11 +98,10 @@ public class StandbyTaskTest {
                     new MockStateStoreSupplier(ktable.topic(), true, false).get()
             ),
             new HashMap<String, String>() {
-            {
-                put("ktable1", ktable.topic());
-            }
-        },
-            Collections.<StateStore, ProcessorNode>emptyMap(),
+                {
+                    put("ktable1", ktable.topic());
+                }
+            },
             Collections.<StateStore>emptyList());
     private File baseDir;
     private StateDirectory stateDirectory;
@@ -320,7 +323,7 @@ public class StandbyTaskTest {
                 new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
         final KStreamBuilder builder = new KStreamBuilder();
         builder.stream("topic").groupByKey().count("my-store");
-        final ProcessorTopology topology = builder.build(0);
+        final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
         StreamsConfig config = createConfig(baseDir);
         new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config,
             new MockStreamsMetrics(new Metrics()), stateDirectory);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4b9f92f..4e95911 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -91,7 +91,6 @@ public class StreamTaskTest {
             Collections.<String, SinkNode>emptyMap(),
             Collections.<StateStore>emptyList(),
             Collections.<String, String>emptyMap(),
-            Collections.<StateStore, ProcessorNode>emptyMap(),
             Collections.<StateStore>emptyList());
     private File baseDir;
     private StateDirectory stateDirectory;
@@ -350,10 +349,10 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory,  new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
+            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
         final int offset = 20;
         streamTask.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -400,7 +399,6 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer,
@@ -424,7 +422,6 @@ public class StreamTaskTest {
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore, ProcessorNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
         final AtomicBoolean flushed = new AtomicBoolean(false);
         final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8f03e4f..20d428b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -706,7 +706,7 @@ public class StreamThreadTest {
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId("appId")
+        builder.setApplicationId(applicationId)
                 .addSource("name", "topic")
                 .addSink("out", "output");
 
@@ -729,7 +729,7 @@ public class StreamThreadTest {
     @Test
     public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("appId");
+        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
         final StreamsConfig config = new StreamsConfig(configProps());
@@ -780,7 +780,7 @@ public class StreamThreadTest {
     @Test
     public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("appId");
+        builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
         final StreamsConfig config = new StreamsConfig(configProps());
@@ -845,7 +845,7 @@ public class StreamThreadTest {
     @Test
     public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId("appId");
+        builder.setApplicationId(applicationId);
         builder.stream(Pattern.compile("t.*")).to("out");
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -857,7 +857,7 @@ public class StreamThreadTest {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
-                final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer,
+                final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer,
                     producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 createdTasks.put(partitions, task);
                 return task;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index b38daf1..d51384c 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -240,20 +240,9 @@ public class KStreamTestDriver {
     }
 
     public void flushState() {
-        final ProcessorNode current = currNode;
-        try {
-            for (StateStore stateStore : context.allStateStores().values()) {
-                final ProcessorNode processorNode = topology.storeToProcessorNodeMap().get(stateStore);
-                if (processorNode != null) {
-                    currNode = processorNode;
-                }
-                stateStore.flush();
-            }
-        } finally {
-            currNode = current;
-
+        for (StateStore stateStore : context.allStateStores().values()) {
+            stateStore.flush();
         }
-
     }
 
     public void setCurrentNode(final ProcessorNode currentNode) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e90db3e3/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 0850b60..89ca0df 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -162,7 +162,7 @@ public class ProcessorTopologyTestDriver {
      */
     public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
         id = new TaskId(0, 0);
-        topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null);
+        topology = builder.setApplicationId(applicationId).build(null);
         globalTopology  = builder.buildGlobalStateTopology();
 
         // Set up the consumer and producer ...


Mime
View raw message