kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-4593; Don't throw IllegalStateException and die on task migration
Date Fri, 29 Sep 2017 09:03:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 177dd7f21 -> eaabb6cd0


KAFKA-4593; Don't throw IllegalStateException and die on task migration

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3948 from mjsax/kafka-4593-illegal-state-exception-in-restore


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eaabb6cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eaabb6cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eaabb6cd

Branch: refs/heads/trunk
Commit: eaabb6cd0173c4f6854eb5da39194a7e3fc0162c
Parents: 177dd7f
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri Sep 29 10:00:13 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 29 10:00:13 2017 +0100

----------------------------------------------------------------------
 .../streams/errors/TaskMigratedException.java   |  52 +++++++
 .../processor/internals/AssignedTasks.java      |  89 +++++++++---
 .../processor/internals/ChangelogReader.java    |   2 +-
 .../processor/internals/PunctuationQueue.java   |   6 +-
 .../processor/internals/RestoringTasks.java     |  23 +++
 .../internals/StoreChangelogReader.java         |  43 +++---
 .../streams/processor/internals/StreamTask.java | 111 ++++++++++-----
 .../processor/internals/StreamThread.java       |  36 ++++-
 .../processor/internals/TaskManager.java        |  33 ++++-
 .../processor/internals/AssignedTasksTest.java  | 140 ++++++++++++-------
 .../internals/MockChangelogReader.java          |  53 +++++++
 .../internals/ProcessorStateManagerTest.java    |   1 -
 .../internals/StoreChangelogReaderTest.java     |  87 +++++++++---
 .../processor/internals/StreamThreadTest.java   |  16 ++-
 .../processor/internals/TaskManagerTest.java    |   4 +-
 .../apache/kafka/test/MockChangelogReader.java  |  55 --------
 16 files changed, 530 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
new file mode 100644
index 0000000..f2fa594
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.internals.Task;
+
+/**
+ * Indicates that a task got migrated to another thread.
+ * Thus, the task raising this exception can be cleaned up and closed as "zombie".
+ */
+public class TaskMigratedException extends StreamsException {
+
+    private final static long serialVersionUID = 1L;
+
+    public TaskMigratedException(final Task task) {
+        this(task, null);
+    }
+
+    public TaskMigratedException(final Task task,
+                                 final TopicPartition topicPartition,
+                                 final long endOffset,
+                                 final long pos) {
+        super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s",
+                            topicPartition,
+                            endOffset,
+                            pos,
+                            task.toString("> ")),
+            null);
+    }
+
+    public TaskMigratedException(final Task task,
+                                 final Throwable throwable) {
+        super(task.toString(), throwable);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index fcb717d..4448a78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -16,13 +16,12 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
@@ -38,7 +37,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-class AssignedTasks {
+class AssignedTasks implements RestoringTasks {
     private final Logger log;
     private final String taskTypeName;
     private final TaskAction maybeCommitAction;
@@ -51,6 +50,7 @@ class AssignedTasks {
     // IQ may access this map.
     private Map<TaskId, Task> running = new ConcurrentHashMap<>();
     private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
+    private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
     private int committed = 0;
 
 
@@ -121,7 +121,7 @@ class AssignedTasks {
             try {
                 if (!entry.getValue().initialize()) {
                     log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
-                    restoring.put(entry.getKey(), entry.getValue());
+                    addToRestoring(entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue());
                 }
@@ -188,6 +188,7 @@ class AssignedTasks {
         restoring.clear();
         created.clear();
         runningByPartition.clear();
+        restoringByPartition.clear();
         return firstException.get();
     }
 
@@ -213,11 +214,8 @@ class AssignedTasks {
             try {
                 task.suspend();
                 suspended.put(task.id(), task);
-            } catch (final CommitFailedException e) {
-                suspended.put(task.id(), task);
-                // commit failed during suspension. Just log it.
-                log.warn("Failed to commit {} {} state when suspending due to CommitFailedException", taskTypeName, task.id());
-            } catch (final ProducerFencedException e) {
+            } catch (final TaskMigratedException closeAsZombieAndSwallow) {
+                // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
                 closeZombieTask(task);
                 it.remove();
             } catch (final RuntimeException e) {
@@ -236,11 +234,11 @@ class AssignedTasks {
     }
 
     private void closeZombieTask(final Task task) {
-        log.warn("Producer of task {} fenced; closing zombie task", task.id());
+        log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
         try {
             task.close(false, true);
         } catch (final Exception e) {
-            log.warn("{} Failed to close zombie due to {}, ignore and proceed", taskTypeName, e);
+            log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage());
         }
     }
 
@@ -248,13 +246,22 @@ class AssignedTasks {
         return !running.isEmpty();
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
         if (suspended.containsKey(taskId)) {
             final Task task = suspended.get(taskId);
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
-                task.resume();
+                try {
+                    task.resume();
+                } catch (final TaskMigratedException e) {
+                    closeZombieTask(task);
+                    suspended.remove(taskId);
+                    throw e;
+                }
                 transitionToRunning(task);
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
@@ -265,6 +272,16 @@ class AssignedTasks {
         return false;
     }
 
+    private void addToRestoring(final Task task) {
+        restoring.put(task.id(), task);
+        for (TopicPartition topicPartition : task.partitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+        for (TopicPartition topicPartition : task.changelogPartitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+    }
+
     private void transitionToRunning(final Task task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
@@ -276,6 +293,11 @@ class AssignedTasks {
         }
     }
 
+    @Override
+    public Task restoringTaskFor(final TopicPartition partition) {
+        return restoringByPartition.get(partition);
+    }
+
     Task runningTaskFor(final TopicPartition partition) {
         return runningByPartition.get(partition);
     }
@@ -332,6 +354,7 @@ class AssignedTasks {
 
     void clear() {
         runningByPartition.clear();
+        restoringByPartition.clear();
         running.clear();
         created.clear();
         suspended.clear();
@@ -342,25 +365,42 @@ class AssignedTasks {
         return previousActiveTasks;
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     int commit() {
         applyToRunningTasks(commitAction);
         return running.size();
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     int maybeCommit() {
         committed = 0;
         applyToRunningTasks(maybeCommitAction);
         return committed;
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     int process() {
         int processed = 0;
-        for (final Task task : running.values()) {
+        final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
+        while (it.hasNext()) {
+            final Task task = it.next().getValue();
             try {
                 if (task.process()) {
                     processed++;
                 }
-            } catch (RuntimeException e) {
+            } catch (final TaskMigratedException e) {
+                closeZombieTask(task);
+                it.remove();
+                throw e;
+            } catch (final RuntimeException e) {
                 log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
                 throw e;
             }
@@ -368,9 +408,14 @@ class AssignedTasks {
         return processed;
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     int punctuate() {
         int punctuated = 0;
-        for (Task task : running.values()) {
+        final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
+        while (it.hasNext()) {
+            final Task task = it.next().getValue();
             try {
                 if (task.maybePunctuateStreamTime()) {
                     punctuated++;
@@ -378,7 +423,11 @@ class AssignedTasks {
                 if (task.maybePunctuateSystemTime()) {
                     punctuated++;
                 }
-            } catch (KafkaException e) {
+            } catch (final TaskMigratedException e) {
+                closeZombieTask(task);
+                it.remove();
+                throw e;
+            } catch (final KafkaException e) {
                 log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
                 throw e;
             }
@@ -393,12 +442,12 @@ class AssignedTasks {
             final Task task = it.next();
             try {
                 action.apply(task);
-            } catch (final CommitFailedException e) {
-                // commit failed. This is already logged inside the task as WARN and we can just log it again here.
-                log.warn("Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", taskTypeName, task.id(), action.name());
-            } catch (final ProducerFencedException e) {
+            } catch (final TaskMigratedException e) {
                 closeZombieTask(task);
                 it.remove();
+                if (firstException == null) {
+                    firstException = e;
+                }
             } catch (final RuntimeException t) {
                 log.error("Failed to {} {} {} due to the following error:",
                           action.name(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c..ed07aa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@ public interface ChangelogReader {
      * Restore all registered state stores by reading from their changelogs.
      * @return all topic partitions that have been restored
      */
-    Collection<TopicPartition> restore();
+    Collection<TopicPartition> restore(final RestoringTasks active);
 
     /**
      * @return the restored offsets for all persistent stores.

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index ec047e6..80eda6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 
@@ -38,7 +39,10 @@ public class PunctuationQueue {
         }
     }
 
-    public boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) {
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
+    boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) {
         synchronized (pq) {
             boolean punctuated = false;
             PunctuationSchedule top = pq.peek();

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
new file mode 100644
index 0000000..6ed28fd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface RestoringTasks {
+    Task restoringTaskFor(final TopicPartition partition);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 4ba860d..cc298e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 
@@ -48,8 +49,7 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
 
-    public StoreChangelogReader(final String threadId,
-                                final Consumer<byte[], byte[]> consumer,
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
                                 final StateRestoreListener userStateRestoreListener,
                                 final LogContext logContext) {
         this.consumer = consumer;
@@ -57,12 +57,6 @@ public class StoreChangelogReader implements ChangelogReader {
         this.userStateRestoreListener = userStateRestoreListener;
     }
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
-                                final StateRestoreListener stateRestoreListener,
-                                final LogContext logContext) {
-        this("", consumer, stateRestoreListener, logContext);
-    }
-
     @Override
     public void register(final StateRestorer restorer) {
         restorer.setUserRestoreListener(userStateRestoreListener);
@@ -70,7 +64,10 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-    public Collection<TopicPartition> restore() {
+    /**
+     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+     */
+    public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
             initialize();
         }
@@ -83,7 +80,7 @@ public class StoreChangelogReader implements ChangelogReader {
         final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
         final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10);
         for (final TopicPartition partition : partitions) {
-            restorePartition(allRecords, partition);
+            restorePartition(allRecords, partition, active.restoringTaskFor(partition));
         }
 
         if (needsRestoring.isEmpty()) {
@@ -230,19 +227,19 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.clear();
     }
 
+    /**
+     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+     */
     private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
-                                  final TopicPartition topicPartition) {
+                                  final TopicPartition topicPartition,
+                                  final Task task) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
         restorer.setRestoredOffset(pos);
         if (restorer.hasCompleted(pos, endOffset)) {
             if (pos > endOffset + 1) {
-                throw new IllegalStateException(
-                        String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
-                                      topicPartition,
-                                      endOffset,
-                                      pos));
+                throw new TaskMigratedException(task, topicPartition, endOffset, pos);
             }
 
             log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
@@ -260,12 +257,11 @@ public class StoreChangelogReader implements ChangelogReader {
                              final StateRestorer restorer,
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
-        long nextPosition = -1;
+        long offset = -1;
 
         for (final ConsumerRecord<byte[], byte[]> record : records) {
-            final long offset = record.offset();
+            offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
-                nextPosition = record.offset();
                 break;
             }
             if (record.key() != null) {
@@ -273,17 +269,16 @@ public class StoreChangelogReader implements ChangelogReader {
             }
         }
 
-        if (nextPosition == -1) {
-            nextPosition = consumer.position(restorer.partition());
+        if (offset == -1) {
+            offset = consumer.position(restorer.partition());
         }
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(nextPosition, records.size());
-
+            restorer.restoreBatchCompleted(offset + 1, records.size());
         }
 
-        return nextPosition;
+        return consumer.position(restorer.partition());
     }
 
     private boolean hasPartition(final TopicPartition topicPartition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8c26fa9..3d6c9b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -99,6 +100,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
      * @param producer              the instance of {@link Producer} used to produce records
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
                       final String applicationId,
@@ -145,8 +147,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         if (eosEnabled) {
-            this.producer.initTransactions();
-            this.producer.beginTransaction();
+            try {
+                this.producer.initTransactions();
+                this.producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
             transactionInFlight = true;
         }
     }
@@ -167,12 +173,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * - re-initialize the task
      * - if (eos) begin new transaction
      * </pre>
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void resume() {
         log.debug("Resuming");
         if (eosEnabled) {
-            producer.beginTransaction();
+            try {
+                producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
             transactionInFlight = true;
         }
         initTopology();
@@ -182,6 +193,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Process one record.
      *
      * @return true if this method processes a record, false if it does not process a record.
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @SuppressWarnings("unchecked")
     public boolean process() {
@@ -214,6 +226,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (recordInfo.queue().size() == maxBufferedSize) {
                 consumer.resume(singleton(partition));
             }
+        } catch (final ProducerFencedException fatal) {
+            throw new TaskMigratedException(this, fatal);
         } catch (final KafkaException e) {
             throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
                 id(),
@@ -231,6 +245,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     /**
      * @throws IllegalStateException if the current node is not null
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
@@ -246,6 +261,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         try {
             node.punctuate(timestamp, punctuator);
+        } catch (final ProducerFencedException fatal) {
+            throw new TaskMigratedException(this, fatal);
         } catch (final KafkaException e) {
             throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix,  node.name()), e);
         } finally {
@@ -264,12 +281,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * - if(!eos) write checkpoint
      * - commit offsets and start new transaction
      * </pre>
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     @Override
     public void commit() {
         commit(true);
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     // visible for testing
     void commit(final boolean startNewTransaction) {
         log.debug("Committing");
@@ -299,44 +322,51 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     protected void flushState() {
         log.trace("Flushing state and producer");
         super.flushState();
-        recordCollector.flush();
+        try {
+            recordCollector.flush();
+        } catch (final ProducerFencedException fatal) {
+            throw new TaskMigratedException(this, fatal);
+        }
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     private void commitOffsets(final boolean startNewTransaction) {
-        if (commitOffsetNeeded) {
-            log.trace("Committing offsets");
-            final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
-            for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                final TopicPartition partition = entry.getKey();
-                final long offset = entry.getValue() + 1;
-                consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
-                stateMgr.putOffsetLimit(partition, offset);
-            }
-
-            if (eosEnabled) {
-                producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                producer.commitTransaction();
-                transactionInFlight = false;
-                if (startNewTransaction) {
-                    transactionInFlight = true;
-                    producer.beginTransaction();
+        try {
+            if (commitOffsetNeeded) {
+                log.trace("Committing offsets");
+                final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
+                for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
+                    final TopicPartition partition = entry.getKey();
+                    final long offset = entry.getValue() + 1;
+                    consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
+                    stateMgr.putOffsetLimit(partition, offset);
                 }
-            } else {
-                try {
+
+                if (eosEnabled) {
+                    producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
+                    producer.commitTransaction();
+                    transactionInFlight = false;
+                    if (startNewTransaction) {
+                        transactionInFlight = true;
+                        producer.beginTransaction();
+                    }
+                } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
-                } catch (final CommitFailedException e) {
-                    log.warn("Failed offset commits {} due to CommitFailedException", consumedOffsetsAndMetadata);
-                    throw e;
                 }
+                commitOffsetNeeded = false;
+            } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+                producer.commitTransaction();
+                transactionInFlight = false;
             }
-            commitOffsetNeeded = false;
-        } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
-            producer.commitTransaction();
-            transactionInFlight = false;
+        } catch (final CommitFailedException | ProducerFencedException fatal) {
+            throw new TaskMigratedException(this, fatal);
         }
     }
 
-    void initTopology() {
+    private void initTopology() {
         // initialize the task by initializing all its processor nodes in the topology
         log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode node : topology.processors()) {
@@ -357,6 +387,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      *   - if (!eos) write checkpoint
      *   - commit offsets
      * </pre>
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     @Override
     public void suspend() {
@@ -372,6 +404,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      *   - if (!eos) write checkpoint
      *   - commit offsets
      * </pre>
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     // visible for testing
     void suspend(final boolean clean) {
@@ -433,7 +467,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                             producer.abortTransaction();
                         }
                         transactionInFlight = false;
-                    } catch (final ProducerFencedException e) {
+                    } catch (final ProducerFencedException ignore) {
+                        /* TODO
+                         * this should actually never happen atm as we we guard the call to #abortTransaction
+                         * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException
+                         * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got
+                         * fixed and fall-back to this catch-and-swallow code
+                         */
+
                         // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens
                     }
                 }
@@ -470,7 +511,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * </pre>
      * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
      *              otherwise, just close open resources
-     * @param isZombie {@code true} is this task is a zombie or not
+     * @param isZombie {@code true} is this task is a zombie or not (this will repress {@link TaskMigratedException}
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     @Override
     public void close(boolean clean,
@@ -550,6 +593,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Possibly trigger registered stream-time punctuation functions if
      * current partition group timestamp has reached the defined stamp
      * Note, this is only called in the presence of new records
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public boolean maybePunctuateStreamTime() {
         final long timestamp = partitionGroup.timestamp();
@@ -567,6 +611,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * Possibly trigger registered system-time punctuation functions if
      * current system timestamp has reached the defined stamp
      * Note, this is called irrespective of the presence of new records
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8d13558..ea7d362 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -43,6 +42,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
@@ -291,7 +291,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                     taskManager.suspendTasksAndState();
                 } catch (final Throwable t) {
                     log.error("Error caught during partition revocation, " +
-                            "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
+                              "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
                     streamThread.setRebalanceException(t);
                 } finally {
                     streamThread.refreshMetadataState();
@@ -339,6 +339,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             this.log = log;
         }
 
+        /**
+         * @throws TaskMigratedException if the task producer got fenced (EOS only)
+         */
         Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
             final List<Task> createdTasks = new ArrayList<>();
             for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
@@ -391,6 +394,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             this.threadClientId = threadClientId;
         }
 
+        /**
+         * @throws TaskMigratedException if the task producer got fenced (EOS only)
+         */
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
@@ -664,8 +670,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         log.info("Creating restore consumer client");
         final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
-                                                                              restoreConsumer,
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer,
                                                                               userStateRestoreListener,
                                                                               logContext);
 
@@ -765,13 +770,22 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
 
         while (isRunning()) {
-            recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
+            try {
+                recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
+            } catch (final TaskMigratedException ignoreAndRejoinGroup) {
+                log.warn("Detected a task that got migrated to another thread. " +
+                    "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
+                    "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup);
+            }
         }
     }
 
     /**
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+     *                               or if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     // Visible for testing
     long runOnce(final long recordsProcessedBeforeCommit) {
@@ -811,6 +825,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
     /**
      * Get the next batch of records by polling.
      * @return Next batch of records or null if no records available.
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     private ConsumerRecords<byte[], byte[]> pollRequests() {
         ConsumerRecords<byte[], byte[]> records = null;
@@ -822,7 +837,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
 
         if (rebalanceException != null) {
-            if (!(rebalanceException instanceof ProducerFencedException)) {
+            if (rebalanceException instanceof TaskMigratedException) {
+                throw (TaskMigratedException) rebalanceException;
+            } else {
                 throw new StreamsException(logPrefix + "Failed to rebalance.", rebalanceException);
             }
         }
@@ -895,6 +912,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
      * @param recordsProcessedBeforeCommit number of records to be processed before commit is called.
      *                                     if UNLIMITED_RECORDS, then commit is never called
      * @return Number of records processed since last commit.
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     private long processAndMaybeCommit(final long recordsProcessedBeforeCommit) {
 
@@ -926,6 +945,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         return totalProcessedSinceLastMaybeCommit;
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void punctuate() {
         final int punctuated = taskManager.punctuate();
         if (punctuated > 0) {
@@ -966,6 +988,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
     /**
      * Commit all tasks owned by this thread if specified interval time has elapsed
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
      */
     void maybeCommit(final long now) {
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 278957e..652f4e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
@@ -67,6 +68,9 @@ class TaskManager {
         this.log = logContext.logger(getClass());
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     void createTasks(final Collection<TopicPartition> assignment) {
         if (threadMetadataProvider == null) {
             throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
@@ -92,6 +96,9 @@ class TaskManager {
         this.threadMetadataProvider = threadMetadataProvider;
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
         Map<TaskId, Set<TopicPartition>> assignedTasks = threadMetadataProvider.activeTasks();
         if (assignedTasks.isEmpty()) {
@@ -132,6 +139,9 @@ class TaskManager {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void addStandbyTasks() {
         final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = threadMetadataProvider.standbyTasks();
         if (assignedStandbyTasks.isEmpty()) {
@@ -177,6 +187,7 @@ class TaskManager {
     /**
      * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
      * soon the tasks will be assigned again
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     void suspendTasksAndState()  {
         log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds());
@@ -188,8 +199,9 @@ class TaskManager {
         // remove the changelog partitions from restore consumer
         restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
-        if (firstException.get() != null) {
-            throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get());
+        final Exception exception = firstException.get();
+        if (exception != null) {
+            throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception);
         }
     }
 
@@ -242,12 +254,13 @@ class TaskManager {
     /**
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
      */
     boolean updateNewAndRestoringTasks() {
         active.initializeNewTasks();
         standby.initializeNewTasks();
 
-        final Collection<TopicPartition> restored = changelogReader.restore();
+        final Collection<TopicPartition> restored = changelogReader.restore(active);
         final Set<TopicPartition> resumed = active.updateRestored(restored);
 
         if (!resumed.isEmpty()) {
@@ -288,19 +301,33 @@ class TaskManager {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     int commitAll() {
         int committed = active.commit();
         return committed + standby.commit();
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     int process() {
         return active.process();
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     int punctuate() {
         return active.punctuate();
     }
 
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     */
     int maybeCommitActiveTasks() {
         return active.maybeCommit();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 7d6bb3a..9d6aea1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -17,11 +17,10 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -91,6 +90,8 @@ public class AssignedTasksTest {
     @Test
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -101,6 +102,8 @@ public class AssignedTasksTest {
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
         EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.expect(t2.initialize()).andReturn(true);
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -152,7 +155,7 @@ public class AssignedTasksTest {
         mockRunningTaskSuspension();
         EasyMock.replay(t1);
 
-        suspendTask();
+        assertThat(suspendTask(), nullValue());
 
         assertThat(assignedTasks.previousTaskIds(), equalTo(Collections.singleton(taskId1)));
         EasyMock.verify(t1);
@@ -161,11 +164,13 @@ public class AssignedTasksTest {
     @Test
     public void shouldCloseRestoringTasks() {
         EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
-        suspendTask();
+        assertThat(suspendTask(), nullValue());
         EasyMock.verify(t1);
     }
 
@@ -176,7 +181,7 @@ public class AssignedTasksTest {
         EasyMock.replay(t1);
 
         assignedTasks.addNewTask(t1);
-        assignedTasks.suspend();
+        assertThat(assignedTasks.suspend(), nullValue());
 
         EasyMock.verify(t1);
     }
@@ -186,8 +191,8 @@ public class AssignedTasksTest {
         mockRunningTaskSuspension();
         EasyMock.replay(t1);
 
-        suspendTask();
-        assignedTasks.suspend();
+        assertThat(suspendTask(), nullValue());
+        assertThat(assignedTasks.suspend(), nullValue());
         EasyMock.verify(t1);
     }
 
@@ -207,15 +212,14 @@ public class AssignedTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnSuspendWhenProducerFencedException() {
+    public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
         mockTaskInitialization();
         t1.suspend();
-        EasyMock.expectLastCall().andThrow(new ProducerFencedException("KABOOM!"));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
-
         assertThat(suspendTask(), nullValue());
         assertTrue(assignedTasks.previousTaskIds().isEmpty());
         EasyMock.verify(t1);
@@ -228,13 +232,32 @@ public class AssignedTasksTest {
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
-        suspendTask();
+        assertThat(suspendTask(), nullValue());
 
         assertTrue(assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
         assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
         EasyMock.verify(t1);
     }
 
+    @Test
+    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+        mockRunningTaskSuspension();
+        t1.resume();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+
+        assertThat(suspendTask(), nullValue());
+
+        try {
+            assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
 
     private void mockTaskInitialization() {
         EasyMock.expect(t1.initialize()).andReturn(true);
@@ -256,29 +279,21 @@ public class AssignedTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnCommitIfProduceFencedException() {
+    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
         mockTaskInitialization();
         t1.commit();
-        EasyMock.expectLastCall().andThrow(new ProducerFencedException(""));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        assignedTasks.commit();
-        EasyMock.verify(t1);
-    }
-
-    @Test
-    public void shouldNotThrowCommitFailedExceptionOnCommit() {
-        mockTaskInitialization();
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new CommitFailedException());
-        EasyMock.replay(t1);
-        addAndInitTask();
+        try {
+            assignedTasks.commit();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
 
-        assignedTasks.commit();
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
         EasyMock.verify(t1);
     }
 
@@ -315,66 +330,91 @@ public class AssignedTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnMaybeCommitIfProduceFencedException() {
+    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
-        EasyMock.expectLastCall().andThrow(new ProducerFencedException(""));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        assignedTasks.maybeCommit();
+        try {
+            assignedTasks.maybeCommit();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
         EasyMock.verify(t1);
     }
 
     @Test
-    public void shouldNotThrowCommitFailedExceptionOnMaybeCommit() {
+    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
-        EasyMock.expect(t1.commitNeeded()).andReturn(true);
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new CommitFailedException());
+        t1.process();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        assignedTasks.maybeCommit();
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        try {
+            assignedTasks.process();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
         EasyMock.verify(t1);
     }
 
     @Test
-    public void shouldThrowExceptionOnMaybeCommitWhenNotCommitFailedOrProducerFenced() {
+    public void shouldPunctuateRunningTasks() {
         mockTaskInitialization();
-        EasyMock.expect(t1.commitNeeded()).andReturn(true);
-        t1.commit();
-        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
+        EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
+        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
         EasyMock.replay(t1);
 
         addAndInitTask();
 
-        try {
-            assignedTasks.maybeCommit();
-            fail("Should have thrown exception");
-        } catch (Exception e) {
-            // ok
-        }
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        assertThat(assignedTasks.punctuate(), equalTo(2));
         EasyMock.verify(t1);
     }
 
+    @Test
+    public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+        mockTaskInitialization();
+        t1.maybePunctuateStreamTime();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        try {
+            assignedTasks.punctuate();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
 
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
+        EasyMock.verify(t1);
+    }
 
     @Test
-    public void shouldPunctuateRunningTasks() {
+    public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
-        EasyMock.expect(t1.maybePunctuateSystemTime()).andReturn(true);
+        t1.maybePunctuateSystemTime();
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        t1.close(false, true);
+        EasyMock.expectLastCall();
         EasyMock.replay(t1);
-
         addAndInitTask();
 
-        assertThat(assignedTasks.punctuate(), equalTo(2));
+        try {
+            assignedTasks.punctuate();
+            fail("Should have thrown TaskMigratedException.");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
         EasyMock.verify(t1);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
new file mode 100644
index 0000000..6c3be61
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MockChangelogReader implements ChangelogReader {
+    private final Set<TopicPartition> registered = new HashSet<>();
+
+    @Override
+    public void register(final StateRestorer restorer) {
+        registered.add(restorer.partition());
+    }
+
+    @Override
+    public Collection<TopicPartition> restore(final RestoringTasks active) {
+        return registered;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> restoredOffsets() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void reset() {
+        registered.clear();
+    }
+
+    public boolean wasRegistered(final TopicPartition partition) {
+        return registered.contains(partition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ede6dd4..dc009f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockChangelogReader;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 5da0a64..3c54851 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -25,12 +25,17 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
 import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -41,13 +46,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(EasyMockRunner.class)
 public class StoreChangelogReaderTest {
 
+    @Mock(type = MockType.NICE)
+    private RestoringTasks active;
+    @Mock(type = MockType.NICE)
+    private Task task;
+
     private final MockStateRestoreListener callback = new MockStateRestoreListener();
     private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -75,7 +88,7 @@ public class StoreChangelogReaderTest {
         final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                 "storeName"));
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertTrue(functionCalled.get());
     }
 
@@ -83,7 +96,7 @@ public class StoreChangelogReaderTest {
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         consumer.subscribe(Collections.singleton("sometopic"));
         try {
-            changelogReader.restore();
+            changelogReader.restore(active);
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException e) {
             // ok
@@ -96,7 +109,7 @@ public class StoreChangelogReaderTest {
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
@@ -107,7 +120,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true,
                                                    "storeName"));
 
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(5));
     }
 
@@ -118,7 +131,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
 
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
     }
 
@@ -128,7 +141,7 @@ public class StoreChangelogReaderTest {
         final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
     }
@@ -150,7 +163,10 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
 
-        changelogReader.restore();
+        expect(active.restoringTaskFor(one)).andReturn(null);
+        expect(active.restoringTaskFor(two)).andReturn(null);
+        replay(active);
+        changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackOne.restored.size(), equalTo(5));
@@ -174,7 +190,10 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
 
-        changelogReader.restore();
+        expect(active.restoringTaskFor(one)).andReturn(null);
+        expect(active.restoringTaskFor(two)).andReturn(null);
+        replay(active);
+        changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackOne.restored.size(), equalTo(5));
@@ -199,7 +218,7 @@ public class StoreChangelogReaderTest {
 
 
     private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener,
-                                                        long startOffset,
+                                                        final long startOffset,
                                                         final long batchOffset, final long endOffset) {
 
         assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
@@ -215,7 +234,7 @@ public class StoreChangelogReaderTest {
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(0L));
     }
@@ -230,7 +249,7 @@ public class StoreChangelogReaderTest {
 
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(endOffset));
     }
@@ -240,7 +259,7 @@ public class StoreChangelogReaderTest {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
-        changelogReader.restore();
+        changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
     }
@@ -250,7 +269,7 @@ public class StoreChangelogReaderTest {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
                                                    "storeName"));
-        changelogReader.restore();
+        changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
@@ -265,7 +284,7 @@ public class StoreChangelogReaderTest {
         consumer.assign(Collections.singletonList(topicPartition));
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
                                                    "storeName"));
-        changelogReader.restore();
+        changelogReader.restore(active);
 
         assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes))));
     }
@@ -275,7 +294,7 @@ public class StoreChangelogReaderTest {
         final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store"));
-        final Collection<TopicPartition> restored = changelogReader.restore();
+        final Collection<TopicPartition> restored = changelogReader.restore(active);
         assertThat(restored, equalTo(expected));
     }
 
@@ -288,11 +307,16 @@ public class StoreChangelogReaderTest {
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
 
-        assertTrue(changelogReader.restore().isEmpty());
+        final TopicPartition postInitialization = new TopicPartition("other", 0);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
+        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
+        replay(active);
+
+        assertTrue(changelogReader.restore(active).isEmpty());
 
         addRecords(9, topicPartition, 1);
 
-        final TopicPartition postInitialization = new TopicPartition("other", 0);
         setupConsumer(3, postInitialization);
         consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
@@ -302,24 +326,45 @@ public class StoreChangelogReaderTest {
         final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
         consumer.assign(expected);
 
-        assertThat(changelogReader.restore(), equalTo(expected));
+        assertThat(changelogReader.restore(active), equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
 
-    private void setupConsumer(final long messages, final TopicPartition topicPartition) {
+    @Test
+    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+            "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown TaskMigratedException");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
+    }
+
+    private void setupConsumer(final long messages,
+                               final TopicPartition topicPartition) {
         assignPartition(messages, topicPartition);
         addRecords(messages, topicPartition, 0);
         consumer.assign(Collections.<TopicPartition>emptyList());
     }
 
-    private void addRecords(final long messages, final TopicPartition topicPartition, final int startingOffset) {
+    private void addRecords(final long messages,
+                            final TopicPartition topicPartition,
+                            final int startingOffset) {
         for (int i = 0; i < messages; i++) {
             consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startingOffset + i, new byte[0], new byte[0]));
         }
     }
 
-    private void assignPartition(final long messages, final TopicPartition topicPartition) {
+    private void assignPartition(final long messages,
+                                 final TopicPartition topicPartition) {
         consumer.updatePartitions(topicPartition.topic(),
                                   Collections.singletonList(
                                           new PartitionInfo(topicPartition.topic(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index fd9b19d..7d04040 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -30,15 +30,16 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.processor.TaskMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -72,6 +73,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StreamThreadTest {
 
@@ -857,7 +859,10 @@ public class StreamThreadTest {
         producer.fenceProducer();
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0]));
-        thread.runOnce(-1);
+        try {
+            thread.runOnce(-1);
+            fail("Should have thrown TaskMigratedException");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
         TestUtils.waitForCondition(
             new TestCondition() {
                 @Override
@@ -892,7 +897,10 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsRevoked(null);
         clientSupplier.producers.get(0).fenceProducer();
         thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
-        thread.runOnce(-1);
+        try {
+            thread.runOnce(-1);
+            fail("Should have thrown TaskMigratedException");
+        } catch (final TaskMigratedException expected) { /* ignore */ }
 
         assertTrue(thread.tasks().isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7a87a27..7ee8fae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -316,7 +316,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
-        EasyMock.expect(changeLogReader.restore()).andReturn(taskId0Partitions);
+        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         EasyMock.expect(active.updateRestored(taskId0Partitions)).
                 andReturn(Collections.<TopicPartition>emptySet());
 
@@ -327,7 +327,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldResumeRestoredPartitions() {
-        EasyMock.expect(changeLogReader.restore()).andReturn(taskId0Partitions);
+        EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         EasyMock.expect(active.updateRestored(taskId0Partitions)).
                 andReturn(taskId0Partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaabb6cd/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
deleted file mode 100644
index 54fd858..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.internals.ChangelogReader;
-import org.apache.kafka.streams.processor.internals.StateRestorer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class MockChangelogReader implements ChangelogReader {
-    private final Set<TopicPartition> registered = new HashSet<>();
-
-    @Override
-    public void register(final StateRestorer restorer) {
-        registered.add(restorer.partition());
-    }
-
-    @Override
-    public Collection<TopicPartition> restore() {
-        return registered;
-    }
-
-    @Override
-    public Map<TopicPartition, Long> restoredOffsets() {
-        return Collections.emptyMap();
-    }
-
-    @Override
-    public void reset() {
-        registered.clear();
-    }
-
-    public boolean wasRegistered(final TopicPartition partition) {
-        return registered.contains(partition);
-    }
-}


Mime
View raw message