kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException (#7608)
Date Wed, 30 Oct 2019 04:38:58 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c77417f  KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException
(#7608)
c77417f is described below

commit c77417f01df03c1248ddc72088d3396a69ad1b40
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Oct 30 00:20:01 2019 -0400

    KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException (#7608)
    
    Third bugfix for the failing broker bounce system test with cooperative rebalancing:
    
    tl;dr We need to remove everything associated with a task when it is closed, but in some
cases (eg AssignedTasks#commit) on a TaskMigratedExceptionwe would close it as a zombie and
then (only) remove the taskId from therunning` map. This left its partitions, restorers, state
stores, etc around and in an undefined state, causing exceptions when closing and/or opening
the stores again.
    
    Longer explanation:
    In AssignedTasks (the abstract class from which the standby and active task variations
extend) a commit failure (even due to broker down/unavailable) is treated as a TaskMigratedException
after which the failed task is closed as a zombie and removed from running -- the remaining
tasks (ie those still in running are then also closed as zombies in the subsequent onPartitionsLost
    
    However we do not remove the closed task from runningByPartition nor do we remove the
corresponding changelogs, if restoring, from the StoreChangelogReader since that applies only
to active tasks, and AssignedTasks is generic/abstract. The changelog reader then retains
a mapping from the closed task's changelog partition to its CompositeRestoreListener (and
does not replace this when the new one comes along after the rebalance). The restore listener
has a reference to a specific Rocks [...]
    
    Although technically this bug existed before KIP-429, it was only uncovered now that we
remove tasks and clear their state/partitions/etc one at a time. We don't technically need
to cherrypick the fix back earlier as before we just blindly clear all data structures entirely
during an eager rebalance.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../processor/internals/AssignedStandbyTasks.java  | 10 +--
 .../processor/internals/AssignedStreamsTasks.java  | 96 ++++++++++------------
 .../streams/processor/internals/AssignedTasks.java | 24 ++++--
 .../processor/internals/ChangelogReader.java       |  4 +
 .../processor/internals/StoreChangelogReader.java  | 12 +++
 .../streams/processor/internals/TaskManager.java   |  4 +-
 .../internals/AssignedStreamsTasksTest.java        | 87 ++++++++------------
 .../processor/internals/MockChangelogReader.java   |  5 ++
 .../processor/internals/StreamThreadTest.java      | 12 +--
 9 files changed, 126 insertions(+), 128 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index 0c9a70d..f217a55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -34,8 +34,8 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
         log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
-                      "created tasks to close: {}" + "\n" +
-                      "running tasks to close: {}",
+                      "non-initialized standby tasks to close: {}" + "\n" +
+                      "running standby tasks to close: {}",
             clean, created.keySet(), running.keySet());
         super.shutdown(clean);
     }
@@ -63,7 +63,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
         final List<TopicPartition> revokedChangelogs = new ArrayList<>();
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : revokedTasks.entrySet())
{
             final TaskId taskId = entry.getKey();
-            final Task task;
+            final StandbyTask task;
 
             if (running.containsKey(taskId)) {
                 task = running.get(taskId);
@@ -77,9 +77,9 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
             try {
                 task.close(true, false);
             } catch (final RuntimeException e) {
-                log.error("Closing the {} {} failed due to the following error:", taskTypeName,
task.id(), e);
+                log.error("Closing the standby task {} failed due to the following error:",
task.id(), e);
             } finally {
-                running.remove(taskId);
+                removeTaskFromRunning(task);
                 revokedChangelogs.addAll(task.changelogPartitions());
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 3515824..63dac25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -136,12 +136,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                 suspended.put(id, task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // swallow and move on since we are rebalancing
-                log.info("Failed to suspend the stream task {} since it got migrated to another
thread already. " +
+                log.info("Failed to suspend stream task {} since it got migrated to another
thread already. " +
                     "Closing it as zombie and moving on.", id);
                 firstException.compareAndSet(null, closeZombieTask(task));
                 prevActiveTasks.remove(id);
             } catch (final RuntimeException e) {
-                log.error("Suspending the stream task {} failed due to the following error:",
id, e);
+                log.error("Suspending stream task {} failed due to the following error:",
id, e);
                 firstException.compareAndSet(null, e);
                 try {
                     prevActiveTasks.remove(id);
@@ -152,9 +152,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                         id, f);
                 }
             } finally {
-                running.remove(id);
-                runningByPartition.keySet().removeAll(task.partitions());
-                runningByPartition.keySet().removeAll(task.changelogPartitions());
+                removeTaskFromRunning(task);
                 taskChangelogs.addAll(task.changelogPartitions());
             }
         }
@@ -193,9 +191,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
     private RuntimeException closeRunning(final boolean isZombie,
                                           final StreamTask task,
                                           final List<TopicPartition> closedTaskChangelogs)
{
-        running.remove(task.id());
-        runningByPartition.keySet().removeAll(task.partitions());
-        runningByPartition.keySet().removeAll(task.changelogPartitions());
+        removeTaskFromRunning(task);
         closedTaskChangelogs.addAll(task.changelogPartitions());
 
         try {
@@ -228,12 +224,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
     private RuntimeException closeRestoring(final boolean isZombie,
                                             final StreamTask task,
                                             final List<TopicPartition> closedTaskChangelogs)
{
-        restoring.remove(task.id());
+        removeTaskFromRestoring(task);
         closedTaskChangelogs.addAll(task.changelogPartitions());
-        for (final TopicPartition tp : task.partitions()) {
-            restoredPartitions.remove(tp);
-            restoringByPartition.remove(tp);
-        }
 
         try {
             final boolean clean = !isZombie;
@@ -282,15 +274,19 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
 
         for (final TaskId id : lostTasks) {
             if (suspended.containsKey(id)) {
+                log.debug("Closing the zombie suspended stream task {}.", id);
                 firstException.compareAndSet(null, closeSuspended(true, suspended.get(id)));
             } else if (created.containsKey(id)) {
+                log.debug("Closing the zombie created stream task {}.", id);
                 firstException.compareAndSet(null, closeNonRunning(true, created.get(id),
lostTaskChangelogs));
             } else if (restoring.containsKey(id)) {
+                log.debug("Closing the zombie restoring stream task {}.", id);
                 firstException.compareAndSet(null, closeRestoring(true, created.get(id),
lostTaskChangelogs));
             } else if (running.containsKey(id)) {
+                log.debug("Closing the zombie running stream task {}.", id);
                 firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
             } else {
-                // task may have already been closed as a zombie and removed from all task
maps
+                log.warn("Skipping closing the zombie stream task {} as it was already removed.",
id);
             }
         }
 
@@ -324,14 +320,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                 } catch (final TaskMigratedException e) {
                     // we need to catch migration exception internally since this function
                     // is triggered in the rebalance callback
-                    log.info("Failed to resume the stream task {} since it got migrated to
another thread already. " +
-                        "Closing it as zombie before triggering a new rebalance.", task.id());
-                    final RuntimeException fatalException = closeZombieTask(task);
-                    running.remove(taskId);
-
-                    if (fatalException != null) {
-                        throw fatalException;
-                    }
+                    log.info("Failed to resume stream task {} since it got migrated to another
thread already. " +
+                             "Will trigger a new rebalance and close all tasks as zombies
together.", task.id());
                     throw e;
                 }
                 log.trace("Resuming the suspended stream task {}", task.id());
@@ -374,7 +364,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
         }
     }
 
-    void addToRestoring(final StreamTask task) {
+    void addTaskToRestoring(final StreamTask task) {
         restoring.put(task.id(), task);
         for (final TopicPartition topicPartition : task.partitions()) {
             restoringByPartition.put(topicPartition, task);
@@ -384,6 +374,18 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
         }
     }
 
+    private void removeTaskFromRestoring(final StreamTask task) {
+        restoring.remove(task.id());
+        for (final TopicPartition topicPartition : task.partitions()) {
+            restoredPartitions.remove(topicPartition);
+            restoringByPartition.remove(topicPartition);
+        }
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
+            restoredPartitions.remove(topicPartition);
+            restoringByPartition.remove(topicPartition);
+        }
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -392,8 +394,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
         int committed = 0;
         RuntimeException firstException = null;
 
-        for (final Iterator<StreamTask> it = running().iterator(); it.hasNext(); )
{
-            final StreamTask task = it.next();
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.commitRequested() && task.commitNeeded()) {
                     task.commit();
@@ -402,12 +403,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to commit stream task {} since it got migrated to another
thread already. " +
-                        "Closing it as zombie before triggering a new rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                         "Will trigger a new rebalance and close all tasks as zombies together.",
task.id());
                 throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to commit stream task {} due to the following error:",
task.id(), t);
@@ -443,21 +439,14 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
     int process(final long now) {
         int processed = 0;
 
-        final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
-        while (it.hasNext()) {
-            final StreamTask task = it.next().getValue();
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.isProcessable(now) && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to process stream task {} since it got migrated to another
thread already. " +
-                        "Closing it as zombie before triggering a new rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as zombies together.",
task.id());
                 throw e;
             } catch (final RuntimeException e) {
                 log.error("Failed to process stream task {} due to the following error:",
task.id(), e);
@@ -473,9 +462,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
      */
     int punctuate() {
         int punctuated = 0;
-        final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
-        while (it.hasNext()) {
-            final StreamTask task = it.next().getValue();
+
+        for (final StreamTask task : running.values()) {
             try {
                 if (task.maybePunctuateStreamTime()) {
                     punctuated++;
@@ -485,12 +473,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to punctuate stream task {} since it got migrated to another
thread already. " +
-                        "Closing it as zombie before triggering a new rebalance.", task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as zombies together.",
task.id());
                 throw e;
             } catch (final KafkaException e) {
                 log.error("Failed to punctuate stream task {} due to the following error:",
task.id(), e);
@@ -512,14 +495,23 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
         log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
-                      "non-initialized tasks to close: {}" + "\n" +
+                      "non-initialized stream tasks to close: {}" + "\n" +
                       "restoring tasks to close: {}" + "\n" +
-                      "running tasks to close: {}" + "\n" +
-                      "suspended tasks to close: {}",
+                      "running stream tasks to close: {}" + "\n" +
+                      "suspended stream tasks to close: {}",
             clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet());
         super.shutdown(clean);
     }
 
+    @Override
+    public boolean isEmpty() {
+        return super.isEmpty()
+            && restoring.isEmpty()
+            && restoringByPartition.isEmpty()
+            && restoredPartitions.isEmpty()
+            && suspended.isEmpty();
+    }
+
     public String toString(final String indent) {
         final StringBuilder builder = new StringBuilder();
         builder.append(super.toString(indent));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 56fefe0..6d64d40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -71,7 +71,7 @@ abstract class AssignedTasks<T extends Task> {
                 task.initializeMetadata();
                 if (!task.initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
-                    ((AssignedStreamsTasks) this).addToRestoring((StreamTask) task);
+                    ((AssignedStreamsTasks) this).addTaskToRestoring((StreamTask) task);
                 } else {
                     transitionToRunning(task);
                 }
@@ -121,6 +121,12 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
+    void removeTaskFromRunning(final T task) {
+        running.remove(task.id());
+        runningByPartition.keySet().removeAll(task.partitions());
+        runningByPartition.keySet().removeAll(task.changelogPartitions());
+    }
+
     T runningTaskFor(final TopicPartition partition) {
         return runningByPartition.get(partition);
     }
@@ -176,6 +182,12 @@ abstract class AssignedTasks<T extends Task> {
         created.clear();
     }
 
+    boolean isEmpty() {
+        return runningByPartition.isEmpty()
+                   && running.isEmpty()
+                   && created.isEmpty();
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -184,8 +196,7 @@ abstract class AssignedTasks<T extends Task> {
         int committed = 0;
         RuntimeException firstException = null;
 
-        for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
-            final T task = it.next();
+        for (final T task : running.values()) {
             try {
                 if (task.commitNeeded()) {
                     task.commit();
@@ -193,12 +204,7 @@ abstract class AssignedTasks<T extends Task> {
                 }
             } catch (final TaskMigratedException e) {
                 log.info("Failed to commit {} {} since it got migrated to another thread
already. " +
-                        "Closing it as zombie before triggering a new rebalance.", taskTypeName,
task.id());
-                final RuntimeException fatalException = closeZombieTask(task);
-                if (fatalException != null) {
-                    throw fatalException;
-                }
-                it.remove();
+                        "Will trigger a new rebalance and close all tasks as zombies together.",
taskTypeName, task.id());
                 throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to commit {} {} due to the following error:",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 782be15..df01e7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -56,4 +56,8 @@ public interface ChangelogReader {
      */
     void clear();
 
+    /**
+     * @return whether the changelog reader has just been cleared or is uninitialized.
+     */
+    boolean isEmpty();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 81f76a3..4e5ae57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -69,6 +69,8 @@ public class StoreChangelogReader implements ChangelogReader {
             stateRestorers.put(restorer.partition(), restorer);
 
             log.trace("Added restorer for changelog {}", restorer.partition());
+        } else {
+            log.debug("Skip re-adding restorer for changelog {}", restorer.partition());
         }
 
         needsInitializing.add(restorer.partition());
@@ -296,6 +298,16 @@ public class StoreChangelogReader implements ChangelogReader {
         completedRestorers.clear();
     }
 
+    @Override
+    public boolean isEmpty() {
+        return partitionInfo.isEmpty()
+            && stateRestorers.isEmpty()
+            && needsRestoring.isEmpty()
+            && restoreToOffsets.isEmpty()
+            && needsInitializing.isEmpty()
+            && completedRestorers.isEmpty();
+    }
+
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
                              final StateRestorer restorer,
                              final Long endOffset) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 82496df..72cff77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -269,8 +269,8 @@ public class TaskManager {
 
         if (exception != null) {
             throw exception;
-        } else if (!assignedActiveTasks.isEmpty()) {
-            throw new IllegalStateException("TaskManager had leftover tasks after removing
all zombies");
+        } else if (!(active.isEmpty() && assignedActiveTasks.isEmpty() &&
changelogReader.isEmpty())) {
+            throw new IllegalStateException("TaskManager found leftover active task state
after closing all zombies");
         }
 
         return zombieTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index a8f96e4..68ca9bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -43,6 +43,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import org.junit.function.ThrowingRunnable;
 
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -50,6 +51,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -170,7 +172,7 @@ public class AssignedStreamsTasksTest {
         t1.initializeMetadata();
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
-        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(2);
+        EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
         t1.closeStateManager(true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -259,24 +261,17 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
         t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
 
-        try {
-            assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1));
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.maybeResumeSuspendedTask(taskId1, Collections.singleton(tp1)));
     }
 
     private void mockTaskInitialization() {
@@ -303,23 +298,16 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinCommitIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.commit();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.commit());
     }
 
     @Test
@@ -357,44 +345,30 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinMaybeCommitIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.commitRequested()).andReturn(true);
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.maybeCommitPerUserRequested();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.maybeCommitPerUserRequested());
     }
 
     @Test
-    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinProcessIfTaskMigratedException() {
         mockTaskInitialization();
         EasyMock.expect(t1.isProcessable(0L)).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.process(0L);
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.process(0L));
     }
 
     @Test
@@ -438,39 +412,33 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
+    public void shouldNotCloseTaskWithinMaybePunctuateStreamTimeIfTaskMigratedException()
{
         mockTaskInitialization();
         t1.maybePunctuateStreamTime();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
-        try {
-            assignedTasks.punctuate();
-            fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
 
-        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.EMPTY_SET));
-        EasyMock.verify(t1);
+        verifyTaskMigratedExceptionDoesNotCloseTask(
+            () -> assignedTasks.punctuate());
     }
 
     @Test
-    public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
+    public void shouldNotloseTaskWithinMaybePunctuateSystemTimeIfTaskMigratedException()
{
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
         t1.maybePunctuateSystemTime();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
-        t1.close(false, true);
-        EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
 
         try {
             assignedTasks.punctuate();
             fail("Should have thrown TaskMigratedException.");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
+        } catch (final TaskMigratedException expected) {
+            assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
+        }
         EasyMock.verify(t1);
     }
 
@@ -571,5 +539,16 @@ public class AssignedStreamsTasksTest {
         EasyMock.expectLastCall();
     }
 
+    private void verifyTaskMigratedExceptionDoesNotCloseTask(final ThrowingRunnable action)
{
+        final Set<TaskId> expectedRunningTaskIds = Collections.singleton(taskId1);
+
+        // This action is expected to throw a TaskMigratedException
+        assertThrows(TaskMigratedException.class, action);
+
+        // This task should be closed as a zombie with all the other tasks during onPartitionsLost
+        assertThat(assignedTasks.runningTaskIds(), equalTo(expectedRunningTaskIds));
+
+        EasyMock.verify(t1);
+    }
 
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index 9cc51ce..25c5a13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -60,6 +60,11 @@ public class MockChangelogReader implements ChangelogReader {
         }
     }
 
+    @Override
+    public boolean isEmpty() {
+        return restoredOffsets.isEmpty() && registered.isEmpty();
+    }
+
     public boolean wasRegistered(final TopicPartition partition) {
         return registered.contains(partition);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b56eb8..2c9e948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -842,7 +842,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
throws Exception {
+    public void shouldNotCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
throws Exception {
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
 
@@ -898,16 +898,16 @@ public class StreamThreadTest {
         try {
             thread.runOnce();
             fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
-        TestUtils.waitForCondition(
-            () -> thread.tasks().isEmpty(),
-            "StreamsThread did not remove fenced zombie task.");
+        } catch (final TaskMigratedException expected) {
+            assertTrue("StreamsThread removed the fenced zombie task already, should wait
for rebalance to close all zombies together.",
+                        thread.tasks().containsKey(task1));
+        }
 
         assertThat(producer.commitCount(), equalTo(1L));
     }
 
     @Test
-    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks()
{
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTasks()
{
         final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)),
true);
 
         internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);


Mime
View raw message