kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-6150: KIP-204 part III; Purge repartition topics with the admin client
Date Mon, 04 Dec 2017 18:21:48 GMT
KAFKA-6150: KIP-204 part III; Purge repartition topics with the admin client

1. Add the repartition topics information into ProcessorTopology: personally I do not like leaking this information into the topology but it seems not other simple way around.
2. StreamTask: added one more function to expose the consumed offsets from repartition topics only.
3. TaskManager: use the AdminClient to send the gathered offsets to delete only if a) previous call has completed and client intentionally ignore-and-log any errors, or b) no requests have ever called before.

NOTE that this code depends on the assumption that purge is only called right after the commit has succeeded, hence we presume all consumed offsets are committed.

4. MINOR: Added a few more constructor for ProcessorTopology for cleaner unit tests.
5. MINOR: Extracted MockStateStore out of the deprecated class.
6. MINOR: Made a pass over some unit test classes for clean ups.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4270 from guozhangwang/K6150-purge-repartition-topics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b8a29f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b8a29f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b8a29f1

Branch: refs/heads/trunk
Commit: 4b8a29f12a142d02be0b64eac71975b6a129d04a
Parents: 39c438e
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Dec 4 10:21:42 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 4 10:21:42 2017 -0800

----------------------------------------------------------------------
 .../clients/admin/DeleteRecordsResult.java      |   2 +-
 .../kafka/clients/admin/RecordsToDelete.java    |  15 +
 .../kafka/common/internals/KafkaFutureImpl.java |   5 +
 .../internals/AssignedStreamsTasks.java         |  14 +
 .../internals/InternalTopologyBuilder.java      |  65 +-
 .../processor/internals/ProcessorTopology.java  | 103 +++-
 .../streams/processor/internals/StreamTask.java |  42 +-
 .../processor/internals/StreamThread.java       |   4 +
 .../processor/internals/TaskManager.java        |  34 +-
 .../org/apache/kafka/streams/TopologyTest.java  |   4 +-
 .../internals/AbstractProcessorContextTest.java |   4 +-
 .../processor/internals/AbstractTaskTest.java   |  12 +-
 .../internals/GlobalStateManagerImplTest.java   |  12 +-
 .../internals/GlobalStateTaskTest.java          |  98 ++-
 .../internals/ProcessorStateManagerTest.java    |  46 +-
 .../processor/internals/StandbyTaskTest.java    | 109 ++--
 .../processor/internals/StreamTaskTest.java     | 615 +++++++------------
 .../processor/internals/StreamThreadTest.java   |   2 +-
 .../processor/internals/TaskManagerTest.java    |  62 +-
 .../org/apache/kafka/test/MockStateStore.java   |  84 +++
 .../kafka/test/MockStateStoreSupplier.java      |  64 --
 21 files changed, 725 insertions(+), 671 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
index 44c5252..8a7d39e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
@@ -33,7 +33,7 @@ public class DeleteRecordsResult {
 
     private final Map<TopicPartition, KafkaFuture<DeletedRecords>> futures;
 
-    DeleteRecordsResult(Map<TopicPartition, KafkaFuture<DeletedRecords>> futures) {
+    public DeleteRecordsResult(Map<TopicPartition, KafkaFuture<DeletedRecords>> futures) {
         this.futures = futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
index 1981929..53a6dfb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
@@ -52,6 +52,21 @@ public class RecordsToDelete {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RecordsToDelete that = (RecordsToDelete) o;
+
+        return this.offset == that.offset;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) offset;
+    }
+
+    @Override
     public String toString() {
         return "(beforeOffset = " + offset + ")";
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 9ca019b..e2dbdf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -261,4 +261,9 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
     public synchronized boolean isDone() {
         return done;
     }
+
+    @Override
+    public String toString() {
+        return String.format("KafkaFuture{value=%s,exception=%s,done=%b}", value, exception, done);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 5ef404f..7b05f64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -69,6 +70,19 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
     }
 
     /**
+     * Returns a map of offsets up to which the records can be deleted; this function should only be called
+     * after the commit call to make sure all consumed offsets are actually committed as well
+     */
+    Map<TopicPartition, Long> recordsToDelete() {
+        final Map<TopicPartition, Long> recordsToDelete = new HashMap<>();
+        for (final StreamTask task : running.values()) {
+            recordsToDelete.putAll(task.purgableOffsets());
+        }
+
+        return recordsToDelete;
+    }
+
+    /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     int process() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 881ecd1..90d46aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -858,8 +858,6 @@ public class InternalTopologyBuilder {
                 nodeGroup.addAll(value);
             }
             nodeGroup.removeAll(globalNodeGroups);
-
-
         }
         return build(nodeGroup);
     }
@@ -890,17 +888,17 @@ public class InternalTopologyBuilder {
     }
 
     private ProcessorTopology build(final Set<String> nodeGroup) {
-        final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
-        final Map<String, ProcessorNode> processorMap = new HashMap<>();
+        final Map<String, ProcessorNode> processorMap = new LinkedHashMap<>();
         final Map<String, SourceNode> topicSourceMap = new HashMap<>();
         final Map<String, SinkNode> topicSinkMap = new HashMap<>();
         final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
+        final Set<String> repartitionTopics = new HashSet<>();
 
         // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
+        // also make sure the state store map values following the insertion ordering
         for (final NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
                 final ProcessorNode node = factory.build();
-                processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
                 if (factory instanceof ProcessorNodeFactory) {
@@ -911,41 +909,53 @@ public class InternalTopologyBuilder {
 
                 } else if (factory instanceof SourceNodeFactory) {
                     buildSourceNode(topicSourceMap,
+                                    repartitionTopics,
                                     (SourceNodeFactory) factory,
                                     (SourceNode) node);
 
                 } else if (factory instanceof SinkNodeFactory) {
                     buildSinkNode(processorMap,
                                   topicSinkMap,
+                                  repartitionTopics,
                                   (SinkNodeFactory) factory,
-                                  node);
+                                  (SinkNode) node);
                 } else {
                     throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
             }
         }
 
-        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
+        return new ProcessorTopology(new ArrayList<>(processorMap.values()),
+                                     topicSourceMap,
+                                     topicSinkMap,
+                                     new ArrayList<>(stateStoreMap.values()),
+                                     new ArrayList<>(globalStateStores.values()),
+                                     storeToChangelogTopic,
+                                     repartitionTopics);
     }
 
     @SuppressWarnings("unchecked")
     private void buildSinkNode(final Map<String, ProcessorNode> processorMap,
                                final Map<String, SinkNode> topicSinkMap,
+                               final Set<String> repartitionTopics,
                                final SinkNodeFactory sinkNodeFactory,
-                               final ProcessorNode node) {
+                               final SinkNode node) {
 
         for (final String predecessor : sinkNodeFactory.predecessors) {
             processorMap.get(predecessor).addChild(node);
             if (internalTopicNames.contains(sinkNodeFactory.topic)) {
                 // prefix the internal topic name with the application id
-                topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
+                final String decoratedTopic = decorateTopic(sinkNodeFactory.topic);
+                topicSinkMap.put(decoratedTopic, node);
+                repartitionTopics.add(decoratedTopic);
             } else {
-                topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node);
+                topicSinkMap.put(sinkNodeFactory.topic, node);
             }
         }
     }
 
     private void buildSourceNode(final Map<String, SourceNode> topicSourceMap,
+                                 final Set<String> repartitionTopics,
                                  final SourceNodeFactory sourceNodeFactory,
                                  final SourceNode node) {
 
@@ -956,7 +966,9 @@ public class InternalTopologyBuilder {
         for (final String topic : topics) {
             if (internalTopicNames.contains(topic)) {
                 // prefix the internal topic name with the application id
-                topicSourceMap.put(decorateTopic(topic), node);
+                final String decoratedTopic = decorateTopic(topic);
+                topicSourceMap.put(decoratedTopic, node);
+                repartitionTopics.add(decoratedTopic);
             } else {
                 topicSourceMap.put(topic, node);
             }
@@ -1055,7 +1067,7 @@ public class InternalTopologyBuilder {
                 for (final StateStoreFactory stateFactory : stateFactories.values()) {
                     if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
                         final String name = ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
-                        final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(stateFactory, name);
+                        final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, name);
                         stateChangelogTopics.put(name, internalTopicConfig);
                     }
                 }
@@ -1105,20 +1117,20 @@ public class InternalTopologyBuilder {
         }
     }
 
-    private InternalTopicConfig createInternalTopicConfig(final StateStoreFactory factory,
-                                                          final String name) {
+    private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
+                                                           final String name) {
         if (!factory.isWindowStore()) {
             return new InternalTopicConfig(name,
                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                                            factory.logConfig());
+        } else {
+            final InternalTopicConfig config = new InternalTopicConfig(name,
+                    Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                InternalTopicConfig.CleanupPolicy.delete),
+                    factory.logConfig());
+            config.setRetentionMs(factory.retentionPeriod());
+            return config;
         }
-
-        final InternalTopicConfig config = new InternalTopicConfig(name,
-                                                                   Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                                           InternalTopicConfig.CleanupPolicy.delete),
-                                                                   factory.logConfig());
-        config.setRetentionMs(factory.retentionPeriod());
-        return config;
     }
 
     public synchronized Pattern earliestResetTopicsPattern() {
@@ -1248,8 +1260,9 @@ public class InternalTopologyBuilder {
         return topicPattern;
     }
 
-    public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
-                                                 final String logPrefix) {
+    // package-private for testing only
+    synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
+                                          final String logPrefix) {
         log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)",
                 logPrefix, subscriptionUpdates);
         this.subscriptionUpdates = subscriptionUpdates;
@@ -1667,9 +1680,9 @@ public class InternalTopologyBuilder {
         public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
         TopicsInfo(final Set<String> sinkTopics,
-                          final Set<String> sourceTopics,
-                          final Map<String, InternalTopicConfig> repartitionSourceTopics,
-                          final Map<String, InternalTopicConfig> stateChangelogTopics) {
+                   final Set<String> sourceTopics,
+                   final Map<String, InternalTopicConfig> repartitionSourceTopics,
+                   final Map<String, InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8a29786..4291e34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -27,48 +27,109 @@ import java.util.Set;
 public class ProcessorTopology {
 
     private final List<ProcessorNode> processorNodes;
+    private final Map<String, SourceNode> sourcesByTopic;
+    private final Map<String, SinkNode> sinksByTopic;
     private final List<StateStore> stateStores;
     private final List<StateStore> globalStateStores;
-    private final Map<String, SourceNode> sourceByTopics;
-    private final Map<String, SinkNode> sinkByTopics;
     private final Map<String, String> storeToChangelogTopic;
+    private final Set<String> repartitionTopics;
+
+    public static ProcessorTopology with(final List<ProcessorNode> processorNodes,
+                                         final Map<String, SourceNode> sourcesByTopic,
+                                         final List<StateStore> stateStoresByName,
+                                         final Map<String, String> storeToChangelogTopic) {
+        return new ProcessorTopology(processorNodes,
+                sourcesByTopic,
+                Collections.<String, SinkNode>emptyMap(),
+                stateStoresByName,
+                Collections.<StateStore>emptyList(),
+                storeToChangelogTopic,
+                Collections.<String>emptySet());
+    }
+
+    static ProcessorTopology withSources(final List<ProcessorNode> processorNodes,
+                                         final Map<String, SourceNode> sourcesByTopic) {
+        return new ProcessorTopology(processorNodes,
+                sourcesByTopic,
+                Collections.<String, SinkNode>emptyMap(),
+                Collections.<StateStore>emptyList(),
+                Collections.<StateStore>emptyList(),
+                Collections.<String, String>emptyMap(),
+                Collections.<String>emptySet());
+    }
+
+    static ProcessorTopology withLocalStores(final List<StateStore> stateStores,
+                                             final Map<String, String> storeToChangelogTopic) {
+        return new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                Collections.<String, SourceNode>emptyMap(),
+                Collections.<String, SinkNode>emptyMap(),
+                stateStores,
+                Collections.<StateStore>emptyList(),
+                storeToChangelogTopic,
+                Collections.<String>emptySet());
+    }
+
+    static ProcessorTopology withGlobalStores(final List<StateStore> stateStores,
+                                              final Map<String, String> storeToChangelogTopic) {
+        return new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                Collections.<String, SourceNode>emptyMap(),
+                Collections.<String, SinkNode>emptyMap(),
+                Collections.<StateStore>emptyList(),
+                stateStores,
+                storeToChangelogTopic,
+                Collections.<String>emptySet());
+    }
+
+    static ProcessorTopology withRepartitionTopics(final List<ProcessorNode> processorNodes,
+                                                   final Map<String, SourceNode> sourcesByTopic,
+                                                   final Set<String> repartitionTopics) {
+        return new ProcessorTopology(processorNodes,
+                sourcesByTopic,
+                Collections.<String, SinkNode>emptyMap(),
+                Collections.<StateStore>emptyList(),
+                Collections.<StateStore>emptyList(),
+                Collections.<String, String>emptyMap(),
+                repartitionTopics);
+    }
 
     public ProcessorTopology(final List<ProcessorNode> processorNodes,
-                             final Map<String, SourceNode> sourceByTopics,
-                             final Map<String, SinkNode> sinkByTopics,
+                             final Map<String, SourceNode> sourcesByTopic,
+                             final Map<String, SinkNode> sinksByTopic,
                              final List<StateStore> stateStores,
-                             final Map<String, String> storeToChangelogTopic,
-                             final List<StateStore> globalStateStores) {
+                             final List<StateStore> globalStateStores,
+                             final Map<String, String> stateStoreToChangelogTopic,
+                             final Set<String> repartitionTopics) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
-        this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
-        this.sinkByTopics   = Collections.unmodifiableMap(sinkByTopics);
-        this.stateStores    = Collections.unmodifiableList(stateStores);
-        this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic);
+        this.sourcesByTopic = Collections.unmodifiableMap(sourcesByTopic);
+        this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic);
+        this.stateStores = Collections.unmodifiableList(stateStores);
         this.globalStateStores = Collections.unmodifiableList(globalStateStores);
+        this.storeToChangelogTopic = Collections.unmodifiableMap(stateStoreToChangelogTopic);
+        this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics);
     }
 
     public Set<String> sourceTopics() {
-        return sourceByTopics.keySet();
+        return sourcesByTopic.keySet();
     }
 
     public SourceNode source(String topic) {
-        return sourceByTopics.get(topic);
+        return sourcesByTopic.get(topic);
     }
 
     public Set<SourceNode> sources() {
-        return new HashSet<>(sourceByTopics.values());
+        return new HashSet<>(sourcesByTopic.values());
     }
 
     public Set<String> sinkTopics() {
-        return sinkByTopics.keySet();
+        return sinksByTopic.keySet();
     }
 
     public SinkNode sink(String topic) {
-        return sinkByTopics.get(topic);
+        return sinksByTopic.get(topic);
     }
 
     public Set<SinkNode> sinks() {
-        return new HashSet<>(sinkByTopics.values());
+        return new HashSet<>(sinksByTopic.values());
     }
 
     public List<ProcessorNode> processors() {
@@ -79,12 +140,16 @@ public class ProcessorTopology {
         return stateStores;
     }
 
+    public List<StateStore> globalStateStores() {
+        return globalStateStores;
+    }
+
     public Map<String, String> storeToChangelogTopic() {
         return storeToChangelogTopic;
     }
 
-    public List<StateStore> globalStateStores() {
-        return globalStateStores;
+    boolean isRepartitionTopic(String topic) {
+        return repartitionTopics.contains(topic);
     }
 
     private String childrenToString(String indent, List<ProcessorNode<?, ?>> children) {
@@ -126,7 +191,7 @@ public class ProcessorTopology {
         final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n");
 
         // start from sources
-        for (SourceNode<?, ?> source : sourceByTopics.values()) {
+        for (SourceNode<?, ?> source : sourcesByTopic.values()) {
             sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children()));
         }
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 36d5517..5460c0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -55,7 +55,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
     private final PartitionGroup partitionGroup;
-    private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
+    private final PartitionGroup.RecordInfo recordInfo;
     private final PunctuationQueue streamTimePunctuationQueue;
     private final PunctuationQueue systemTimePunctuationQueue;
 
@@ -90,7 +90,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     /**
      * Create {@link StreamTask} with its assigned partitions
      * @param id                    the ID of this task
-     * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
@@ -113,21 +112,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final Time time,
                       final Producer<byte[], byte[]> producer) {
         super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
+
+        this.time = time;
+        this.producer = producer;
+        this.metrics = new TaskMetrics(metrics);
+
+        recordCollector = createRecordCollector(logContext);
         streamTimePunctuationQueue = new PunctuationQueue();
         systemTimePunctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        this.metrics = new TaskMetrics(metrics);
+
+        // initialize the consumed and committed offset cache
+        consumedOffsets = new HashMap<>();
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
-        // initialize the consumed offset cache
-        consumedOffsets = new HashMap<>();
-
-        this.producer = producer;
-        recordCollector = createRecordCollector(logContext);
-
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
 
@@ -140,8 +141,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             partitionQueues.put(partition, queue);
         }
 
+        recordInfo = new PartitionGroup.RecordInfo();
         partitionGroup = new PartitionGroup(partitionQueues);
-        this.time = time;
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         if (eosEnabled) {
@@ -364,6 +365,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    Map<TopicPartition, Long> purgableOffsets() {
+        final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>();
+        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            if (topology.isRepartitionTopic(tp.topic()))
+                purgableConsumedOffsets.put(tp, entry.getValue() + 1);
+        }
+
+        return purgableConsumedOffsets;
+    }
+
     private void initTopology() {
         // initialize the task by initializing all its processor nodes in the topology
         log.trace("Initializing processor nodes of the topology");
@@ -554,8 +566,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     /**
      * Schedules a punctuation for the processor
      *
-     * @param interval  the interval in milliseconds
-     * @param type
+     * @param interval the interval in milliseconds
+     * @param type the punctuation type
      * @throws IllegalStateException if the current node is not null
      */
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
@@ -588,7 +600,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Note, this is only called in the presence of new records
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    public boolean maybePunctuateStreamTime() {
+    boolean maybePunctuateStreamTime() {
         final long timestamp = partitionGroup.timestamp();
 
         // if the timestamp is not known yet, meaning there is not enough data accumulated
@@ -606,7 +618,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Note, this is called irrespective of the presence of new records
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    public boolean maybePunctuateSystemTime() {
+    boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();
 
         return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
@@ -622,7 +634,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     /**
      * Whether or not a request has been made to commit the current state
      */
-    public boolean commitNeeded() {
+    boolean commitNeeded() {
         return commitRequested;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 14b912e..d930a67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -650,6 +650,7 @@ public class StreamThread extends Thread {
                                                   activeTaskCreator,
                                                   standbyTaskCreator,
                                                   streamsKafkaClient,
+                                                  adminClient,
                                                   new AssignedStreamsTasks(logContext),
                                                   new AssignedStandbyTasks(logContext));
 
@@ -1001,6 +1002,9 @@ public class StreamThread extends Thread {
             int committed = taskManager.commitAll();
             if (committed > 0) {
                 streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs);
+
+                // try to purge the committed records for repartition topics if possible
+                taskManager.maybePurgeCommitedRecords();
             }
             if (log.isDebugEnabled()) {
                 log.debug("Committed all active tasks {} and standby tasks {} in {}ms",

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 1eecb73..e6f05cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
@@ -57,11 +60,13 @@ class TaskManager {
     // TODO: this is going to be replaced by AdminClient
     final StreamsKafkaClient streamsKafkaClient;
 
+    private final AdminClient adminClient;
+    private DeleteRecordsResult deleteRecordsResult;
+
     // following information is updated during rebalance phase by the partition assignor
     private Cluster cluster;
     private Map<TaskId, Set<TopicPartition>> assignedActiveTasks;
     private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks;
-    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
 
     private Consumer<byte[], byte[]> consumer;
 
@@ -73,6 +78,7 @@ class TaskManager {
                 final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
                 final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
                 final StreamsKafkaClient streamsKafkaClient,
+                final AdminClient adminClient,
                 final AssignedStreamsTasks active,
                 final AssignedStandbyTasks standby) {
         this.changelogReader = changelogReader;
@@ -90,6 +96,7 @@ class TaskManager {
         this.log = logContext.logger(getClass());
 
         this.streamsKafkaClient = streamsKafkaClient;
+        this.adminClient = adminClient;
     }
 
     /**
@@ -108,7 +115,7 @@ class TaskManager {
         addStreamTasks(assignment);
         addStandbyTasks();
         final Set<TopicPartition> partitions = active.uninitializedPartitions();
-        log.trace("pausing partitions: {}", partitions);
+        log.trace("Pausing partitions: {}", partitions);
         consumer.pause(partitions);
     }
 
@@ -326,7 +333,7 @@ class TaskManager {
         resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
-            log.trace("resuming partitions {}", resumed);
+            log.trace("Resuming partitions {}", resumed);
             consumer.resume(resumed);
         }
         if (active.allTasksRunning()) {
@@ -368,7 +375,6 @@ class TaskManager {
     }
 
     void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
-        this.partitionsByHostState = partitionsByHostState;
         this.streamsMetadataState.onChange(partitionsByHostState, cluster);
     }
 
@@ -433,6 +439,26 @@ class TaskManager {
         return active.maybeCommit();
     }
 
+    void maybePurgeCommitedRecords() {
+        // we do not check any possible exceptions since none of them are fatal
+        // that should cause the application to fail, and we will try delete with
+        // newer offsets anyways.
+        if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) {
+
+            if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) {
+                log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks());
+            }
+
+            final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+            for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
+                recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
+            }
+            deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
+
+            log.trace("Sent delete-records request: {}", recordsToDelete);
+        }
+    }
+
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         builder.append(indent).append("\tActive tasks:\n");

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 39b1443..3ba7803 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -264,7 +264,7 @@ public class TopologyTest {
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         mockStoreBuilder();
-        EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false));
+        EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStore("store", false));
         EasyMock.replay(storeBuilder);
         topology
             .addSource(sourceNodeName, "topic")

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 57e2121..46c23c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStateStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,7 +42,7 @@ public class AbstractProcessorContextTest {
 
     private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
     private final AbstractProcessorContext context = new TestProcessorContext(metrics);
-    private final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("store", false);
+    private final MockStateStore stateStore = new MockStateStore("store", false);
     private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo");
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index ee366fa..e75b54f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -105,6 +105,7 @@ public class AbstractTaskTest {
         EasyMock.verify(stateDirectory);
     }
 
+    @SuppressWarnings("unchecked")
     private AbstractTask createTask(final Consumer consumer, final List<StateStore> stateStores) {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
@@ -112,14 +113,9 @@ public class AbstractTaskTest {
         final StreamsConfig config = new StreamsConfig(properties);
         return new AbstractTask(id,
                                 Collections.singletonList(new TopicPartition("t", 0)),
-                                new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                      Collections.<String, SourceNode>emptyMap(),
-                                                      Collections.<String, SinkNode>emptyMap(),
-                                                      stateStores,
-                                                      Collections.<String, String>emptyMap(),
-                                                      Collections.<StateStore>emptyList()),
-                                consumer,
-                                new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
+                                ProcessorTopology.withLocalStores(stateStores, Collections.<String, String>emptyMap()),
+                                (Consumer<byte[], byte[]>) consumer,
+                                new StoreChangelogReader((Consumer<byte[], byte[]>) consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
                                 false,
                                 stateDirectory,
                                 config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 0604a00..20cf125 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -45,7 +45,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -86,17 +85,12 @@ public class GlobalStateManagerImplTest {
     @Before
     public void before() throws IOException {
         final Map<String, String> storeToTopic = new HashMap<>();
+        store1 = new NoOpReadOnlyStore<>("t1-store");
+        store2 = new NoOpReadOnlyStore("t2-store");
         storeToTopic.put("t1-store", "t1");
         storeToTopic.put("t2-store", "t2");
 
-        store1 = new NoOpReadOnlyStore<>("t1-store");
-        store2 = new NoOpReadOnlyStore("t2-store");
-        topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                         Collections.<String, SourceNode>emptyMap(),
-                                         Collections.<String, SinkNode>emptyMap(),
-                                         Collections.<StateStore>emptyList(),
-                                         storeToTopic,
-                                         Arrays.<StateStore>asList(store1, store2));
+        topology = ProcessorTopology.withGlobalStores(Utils.<StateStore>mkList(store1, store2), storeToTopic);
 
         context = new NoOpProcessorContext();
         config = new StreamsConfig(new Properties() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 63783a2..38a017d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -37,10 +37,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -53,44 +51,41 @@ import static org.junit.Assert.fail;
 public class GlobalStateTaskTest {
 
     private final LogContext logContext = new LogContext();
-    private Map<TopicPartition, Long> offsets;
-    private GlobalStateUpdateTask globalStateTask;
-    private GlobalStateManagerStub stateMgr;
-    private List<ProcessorNode> processorNodes;
-    private NoOpProcessorContext context;
-    private TopicPartition t1;
-    private TopicPartition t2;
-    private MockSourceNode sourceOne;
-    private MockSourceNode sourceTwo;
+
+    private final String topic1 = "t1";
+    private final String topic2 = "t2";
+    private final TopicPartition t1 = new TopicPartition(topic1, 1);
+    private final TopicPartition t2 = new TopicPartition(topic2, 1);
+    private final MockSourceNode sourceOne = new MockSourceNode<>(new String[]{topic1},
+                                                            new StringDeserializer(),
+                                                            new StringDeserializer());;
+    private final MockSourceNode sourceTwo = new MockSourceNode<>(new String[]{topic2},
+                                                            new IntegerDeserializer(),
+                                                            new IntegerDeserializer());
+    private final MockProcessorNode processorOne = new MockProcessorNode<>(-1);
+    private final MockProcessorNode processorTwo = new MockProcessorNode<>(-1);
+
+    private final Map<TopicPartition, Long> offsets = new HashMap<>();
+    private final NoOpProcessorContext context = new NoOpProcessorContext();
+
     private ProcessorTopology topology;
+    private GlobalStateManagerStub stateMgr;
+    private GlobalStateUpdateTask globalStateTask;
 
     @Before
     public void before() {
-        sourceOne = new MockSourceNode<>(new String[]{"t1"},
-                                         new StringDeserializer(),
-                                         new StringDeserializer());
-        sourceTwo = new MockSourceNode<>(new String[]{"t2"},
-                                         new IntegerDeserializer(),
-                                         new IntegerDeserializer());
-        processorNodes = Arrays.asList(sourceOne, sourceTwo, new MockProcessorNode<>(-1), new MockProcessorNode<>(-1));
         final Set<String> storeNames = Utils.mkSet("t1-store", "t2-store");
         final Map<String, SourceNode> sourceByTopics = new HashMap<>();
-        sourceByTopics.put("t1", sourceOne);
-        sourceByTopics.put("t2", sourceTwo);
+        sourceByTopics.put(topic1, sourceOne);
+        sourceByTopics.put(topic2, sourceTwo);
         final Map<String, String> storeToTopic = new HashMap<>();
-        storeToTopic.put("t1-store", "t1");
-        storeToTopic.put("t2-store", "t2");
-        topology = new ProcessorTopology(processorNodes,
-                                         sourceByTopics,
-                                         Collections.<String, SinkNode>emptyMap(),
-                                         Collections.<StateStore>emptyList(),
-                                         storeToTopic,
-                                         Collections.<StateStore>emptyList());
-        context = new NoOpProcessorContext();
-
-        t1 = new TopicPartition("t1", 1);
-        t2 = new TopicPartition("t2", 1);
-        offsets = new HashMap<>();
+        storeToTopic.put("t1-store", topic1);
+        storeToTopic.put("t2-store", topic2);
+        topology = ProcessorTopology.with(Utils.mkList(sourceOne, sourceTwo, processorOne, processorTwo),
+                                          sourceByTopics,
+                                          Collections.<StateStore>emptyList(),
+                                          storeToTopic);
+
         offsets.put(t1, 50L);
         offsets.put(t2, 100L);
         stateMgr = new GlobalStateManagerStub(storeNames, offsets);
@@ -113,19 +108,16 @@ public class GlobalStateTaskTest {
     @Test
     public void shouldInitializeProcessorTopology() {
         globalStateTask.initialize();
-        for (ProcessorNode processorNode : processorNodes) {
-            if (processorNode instanceof  MockProcessorNode) {
-                assertTrue(((MockProcessorNode) processorNode).initialized);
-            } else {
-                assertTrue(((MockSourceNode) processorNode).initialized);
-            }
-        }
+        assertTrue(sourceOne.initialized);
+        assertTrue(sourceTwo.initialized);
+        assertTrue(processorOne.initialized);
+        assertTrue(processorTwo.initialized);
     }
 
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>("t1", 1, 1, "foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
         assertEquals(1, sourceOne.numReceived);
         assertEquals(0, sourceTwo.numReceived);
     }
@@ -134,7 +126,7 @@ public class GlobalStateTaskTest {
     public void shouldProcessRecordsForOtherTopic() {
         final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes));
+        globalStateTask.update(new ConsumerRecord<>(topic2, 1, 1, integerBytes, integerBytes));
         assertEquals(1, sourceTwo.numReceived);
         assertEquals(0, sourceOne.numReceived);
     }
@@ -143,7 +135,7 @@ public class GlobalStateTaskTest {
                                   final byte[] key,
                                   final byte[] recordValue,
                                   boolean failExpected) {
-        final ConsumerRecord record = new ConsumerRecord<>("t2", 1, 1,
+        final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(topic2, 1, 1,
                 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
                 key, recordValue);
         globalStateTask.initialize();
@@ -162,16 +154,16 @@ public class GlobalStateTaskTest {
 
     @Test
     public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception {
-        final byte[] key = new LongSerializer().serialize("t2", 1L);
-        final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
+        final byte[] key = new LongSerializer().serialize(topic2, 1L);
+        final byte[] recordValue = new IntegerSerializer().serialize(topic2, 10);
         maybeDeserialize(globalStateTask, key, recordValue, true);
     }
 
 
     @Test
     public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
-        final byte[] key = new IntegerSerializer().serialize("t2", 1);
-        final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
+        final byte[] key = new IntegerSerializer().serialize(topic2, 1);
+        final byte[] recordValue = new LongSerializer().serialize(topic2, 10L);
         maybeDeserialize(globalStateTask, key, recordValue, true);
     }
 
@@ -183,8 +175,8 @@ public class GlobalStateTaskTest {
             stateMgr,
             new LogAndContinueExceptionHandler(),
             logContext);
-        final byte[] key = new LongSerializer().serialize("t2", 1L);
-        final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
+        final byte[] key = new LongSerializer().serialize(topic2, 1L);
+        final byte[] recordValue = new IntegerSerializer().serialize(topic2, 10);
 
         maybeDeserialize(globalStateTask2, key, recordValue, false);
     }
@@ -197,8 +189,8 @@ public class GlobalStateTaskTest {
             stateMgr,
             new LogAndContinueExceptionHandler(),
             logContext);
-        final byte[] key = new IntegerSerializer().serialize("t2", 1);
-        final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
+        final byte[] key = new IntegerSerializer().serialize(topic2, 1);
+        final byte[] recordValue = new LongSerializer().serialize(topic2, 10L);
 
         maybeDeserialize(globalStateTask2, key, recordValue, false);
     }
@@ -210,7 +202,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.close();
         assertEquals(expectedOffsets, stateMgr.checkpointed());
         assertTrue(stateMgr.closed);
@@ -222,7 +214,7 @@ public class GlobalStateTaskTest {
         expectedOffsets.put(t1, 102L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes()));
+        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 101, "foo".getBytes(), "foo".getBytes()));
         globalStateTask.flushState();
         assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 1cb0cd4..6ed2245 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -63,15 +63,15 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
-    private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-    private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+    private final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
+    private final MockStateStore nonPersistentStore = new MockStateStore(nonPersistentStoreName, false);
     private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1);
     private final String storeName = "mockStateStore";
     private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
     private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0);
     private final TaskId taskId = new TaskId(0, 1);
     private final MockChangelogReader changelogReader = new MockChangelogReader();
-    private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true);
+    private final MockStateStore mockStateStore = new MockStateStore(storeName, true);
     private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
     private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
     private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value);
@@ -109,7 +109,7 @@ public class ProcessorStateManagerTest {
 
         final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(key, value);
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
+        final MockStateStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
@@ -127,7 +127,7 @@ public class ProcessorStateManagerTest {
         final TaskId taskId = new TaskId(0, 2);
         final Integer intKey = 1;
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
+        final MockStateStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
@@ -144,7 +144,7 @@ public class ProcessorStateManagerTest {
     public void testRegisterPersistentStore() throws IOException {
         final TaskId taskId = new TaskId(0, 2);
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
+        final MockStateStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -170,8 +170,8 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        final MockStateStoreSupplier.MockStateStore nonPersistentStore
-            = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
+        final MockStateStore nonPersistentStore
+            = new MockStateStore(nonPersistentStoreName, false); // non persistent store
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 2),
             noPartitions,
@@ -219,9 +219,9 @@ public class ProcessorStateManagerTest {
         final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
         final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
 
-        final MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
-        final MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
-        final MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
+        final MockStateStore store1 = new MockStateStore(storeName1, true);
+        final MockStateStore store2 = new MockStateStore(storeName2, true);
+        final MockStateStore store3 = new MockStateStore(storeName3, true);
 
         // if there is a source partition, inherit the partition id
         final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
@@ -258,7 +258,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testGetStore() throws IOException {
-        final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
+        final MockStateStore mockStateStore = new MockStateStore(nonPersistentStoreName, false);
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 1),
             noPartitions,
@@ -347,7 +347,7 @@ public class ProcessorStateManagerTest {
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
         checkpoint.write(offsets);
 
-        final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
+        final MockStateStore persistentStore = new MockStateStore(persistentStoreName, true);
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -465,7 +465,7 @@ public class ProcessorStateManagerTest {
             logContext);
 
         try {
-            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
+            stateManager.register(new MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
             fail("should have thrown illegal argument exception when store name same as checkpoint file");
         } catch (final IllegalArgumentException e) {
             //pass
@@ -508,7 +508,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
 
-        final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
+        final MockStateStore stateStore = new MockStateStore(storeName, true) {
             @Override
             public void flush() {
                 throw new RuntimeException("KABOOM!");
@@ -537,7 +537,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
 
-        final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
+        final MockStateStore stateStore = new MockStateStore(storeName, true) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
@@ -567,13 +567,13 @@ public class ProcessorStateManagerTest {
 
         final AtomicBoolean flushedStore = new AtomicBoolean(false);
 
-        final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) {
+        final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
             @Override
             public void flush() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
+        final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
             @Override
             public void flush() {
                 flushedStore.set(true);
@@ -602,13 +602,13 @@ public class ProcessorStateManagerTest {
 
         final AtomicBoolean closedStore = new AtomicBoolean(false);
 
-        final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) {
+        final MockStateStore stateStore1 = new MockStateStore(storeName, true) {
             @Override
             public void close() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
+        final MockStateStore stateStore2 = new MockStateStore(storeName + "2", true) {
             @Override
             public void close() {
                 closedStore.set(true);
@@ -664,8 +664,8 @@ public class ProcessorStateManagerTest {
             logContext);
     }
 
-    private MockStateStoreSupplier.MockStateStore getPersistentStore() {
-        return new MockStateStoreSupplier.MockStateStore("persistentStore", true);
+    private MockStateStore getPersistentStore() {
+        return new MockStateStore("persistentStore", true);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8538567..ce655bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockStateRestoreListener;
+import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
@@ -80,43 +81,31 @@ public class StandbyTaskTest {
     private final String storeName2 = "store2";
     private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
     private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+    private final String globalStoreName = "ktable1";
 
     private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
     private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
     private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
 
     private final Set<TopicPartition> topicPartitions = Collections.emptySet();
-    private final ProcessorTopology topology = new ProcessorTopology(
-            Collections.<ProcessorNode>emptyList(),
-            Collections.<String, SourceNode>emptyMap(),
-            Collections.<String, SinkNode>emptyMap(),
-            Utils.mkList(
-                    new MockStateStoreSupplier(storeName1, false).get(),
-                    new MockStateStoreSupplier(storeName2, true).get()
-            ),
+    private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
+            Utils.mkList(new MockStateStoreSupplier(storeName1, false).get(), new MockStateStoreSupplier(storeName2, true).get()),
             new HashMap<String, String>() {
                 {
                     put(storeName1, storeChangelogTopicName1);
                     put(storeName2, storeChangelogTopicName2);
                 }
-            },
-            Collections.<StateStore>emptyList());
-
-    private final TopicPartition ktable = new TopicPartition("ktable1", 0);
-    private final Set<TopicPartition> ktablePartitions = Utils.mkSet(ktable);
-    private final ProcessorTopology ktableTopology = new ProcessorTopology(
-            Collections.<ProcessorNode>emptyList(),
-            Collections.<String, SourceNode>emptyMap(),
-            Collections.<String, SinkNode>emptyMap(),
-            Utils.mkList(
-                    new MockStateStoreSupplier(ktable.topic(), true, false).get()
-            ),
+            });
+    private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
+    private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
+    private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
+            Collections.<StateStore>singletonList(new MockStateStoreSupplier(globalTopicPartition.topic(), true, false).get()),
             new HashMap<String, String>() {
                 {
-                    put("ktable1", ktable.topic());
+                    put(globalStoreName, globalTopicPartition.topic());
                 }
-            },
-            Collections.<StateStore>emptyList());
+            });
+
     private File baseDir;
     private StateDirectory stateDirectory;
 
@@ -203,10 +192,8 @@ public class StandbyTaskTest {
         task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
-        MockStateStoreSupplier.MockStateStore store1 =
-                (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName1);
-        MockStateStoreSupplier.MockStateStore store2 =
-                (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(storeName2);
+        MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
+        MockStateStore store2 = (MockStateStore) context.getStateMgr().getStore(storeName2);
 
         assertEquals(Collections.emptyList(), store1.keys);
         assertEquals(Utils.mkList(1, 2, 3), store2.keys);
@@ -223,15 +210,15 @@ public class StandbyTaskTest {
 
     @Test
     public void testUpdateKTable() throws IOException {
-        consumer.assign(Utils.mkList(ktable));
+        consumer.assign(Utils.mkList(globalTopicPartition));
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(0L));
         consumer.commitSync(committedOffsets);
 
-        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
-                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("ktable1", 1, Node.noNode(), new Node[0], new Node[0]),
-                new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
+        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
+                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(globalStoreName, 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo(globalStoreName, 2, Node.noNode(), new Node[0], new Node[0])
         ));
 
         StreamsConfig config = createConfig(baseDir);
@@ -240,11 +227,11 @@ public class StandbyTaskTest {
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
-                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
-                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
-                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
-                new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100),
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100),
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100),
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 4, 100),
+                new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 5, 100))) {
             restoreStateConsumer.bufferRecord(record);
         }
 
@@ -259,47 +246,47 @@ public class StandbyTaskTest {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(ktable, restoreStateConsumer.poll(100).records(ktable));
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(10L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
         consumer.commitSync(committedOffsets);
         task.commit(); // update offset limits
 
         // The commit offset has not reached, yet.
-        remaining = task.update(ktable, remaining);
+        remaining = task.update(globalTopicPartition, remaining);
         assertEquals(5, remaining.size());
 
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(11L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(11L));
         consumer.commitSync(committedOffsets);
         task.commit(); // update offset limits
 
         // one record should be processed.
-        remaining = task.update(ktable, remaining);
+        remaining = task.update(globalTopicPartition, remaining);
         assertEquals(4, remaining.size());
 
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(45L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(45L));
         consumer.commitSync(committedOffsets);
         task.commit(); // update offset limits
 
         // The commit offset is now 45. All record except for the last one should be processed.
-        remaining = task.update(ktable, remaining);
+        remaining = task.update(globalTopicPartition, remaining);
         assertEquals(1, remaining.size());
 
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(50L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(50L));
         consumer.commitSync(committedOffsets);
         task.commit(); // update offset limits
 
         // The commit offset is now 50. Still the last record remains.
-        remaining = task.update(ktable, remaining);
+        remaining = task.update(globalTopicPartition, remaining);
         assertEquals(1, remaining.size());
 
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(60L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(60L));
         consumer.commitSync(committedOffsets);
         task.commit(); // update offset limits
 
         // The commit offset is now 60. No record should be left.
-        remaining = task.update(ktable, remaining);
+        remaining = task.update(globalTopicPartition, remaining);
         assertNull(remaining);
 
         task.closeStateManager(true);
@@ -309,7 +296,7 @@ public class StandbyTaskTest {
         Map<TopicPartition, Long> offsets = checkpoint.read();
 
         assertEquals(1, offsets.size());
-        assertEquals(new Long(51L), offsets.get(ktable));
+        assertEquals(new Long(51L), offsets.get(globalTopicPartition));
 
     }
 
@@ -337,13 +324,13 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
-        consumer.assign(Utils.mkList(ktable));
+        consumer.assign(Utils.mkList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
-        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
-                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
+        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
+                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0])));
 
         final TaskId taskId = new TaskId(0, 0);
         final MockTime time = new MockTime();
@@ -363,8 +350,8 @@ public class StandbyTaskTest {
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1);
-        task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(),
-                                                                           ktable.partition(),
+        task.update(globalTopicPartition, Collections.singletonList(new ConsumerRecord<>(globalTopicPartition.topic(),
+                                                                           globalTopicPartition.partition(),
                                                                            50L,
                                                                            serializedValue,
                                                                            serializedValue)));
@@ -374,19 +361,19 @@ public class StandbyTaskTest {
 
         final Map<TopicPartition, Long> checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
                                                                                    ProcessorStateManager.CHECKPOINT_FILE_NAME)).read();
-        assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L)));
+        assertThat(checkpoint, equalTo(Collections.singletonMap(globalTopicPartition, 51L)));
 
     }
 
     @Test
     public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
-        consumer.assign(Utils.mkList(ktable));
+        consumer.assign(Utils.mkList(globalTopicPartition));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
-        committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L));
+        committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L));
         consumer.commitSync(committedOffsets);
 
-        restoreStateConsumer.updatePartitions("ktable1", Utils.mkList(
-                new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
+        restoreStateConsumer.updatePartitions(globalStoreName, Utils.mkList(
+                new PartitionInfo(globalStoreName, 0, Node.noNode(), new Node[0], new Node[0])));
 
         final StreamsConfig config = createConfig(baseDir);
         final AtomicBoolean closedStateManager = new AtomicBoolean(false);


Mime
View raw message