kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5767)
Date Mon, 19 Nov 2018 23:21:22 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 0dcf9db  KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file
does not exist (#5767)
0dcf9db is described below

commit 0dcf9db2b6bbf9c26d611b939b359dddaff5dd51
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Mon Nov 19 15:21:02 2018 -0800

    KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
(#5767)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../processor/internals/AssignedStandbyTasks.java  |  4 +
 .../processor/internals/AssignedStreamsTasks.java  | 94 ++++++++++++++++++++++
 .../streams/processor/internals/AssignedTasks.java | 91 ++++-----------------
 .../streams/processor/internals/StateRestorer.java | 19 +++--
 .../processor/internals/StoreChangelogReader.java  | 64 ++++++++++-----
 .../streams/integration/EosIntegrationTest.java    |  6 +-
 .../internals/StoreChangelogReaderTest.java        | 60 ++++++++++----
 7 files changed, 220 insertions(+), 118 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index a99e451..40ce79c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -24,4 +24,8 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
         super(logContext, "standby task");
     }
 
+    void addToRestoring(final StandbyTask task) {
+        throw new UnsupportedOperationException("Standby tasks cannot be restored actively.");
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f64..98c2bbd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -23,12 +23,21 @@ import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks
{
     private final Logger log;
+    private final Map<TaskId, StreamTask> restoring = new HashMap<>();
+    private final Set<TopicPartition> restoredPartitions = new HashSet<>();
+    private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();
     private final TaskAction<StreamTask> maybeCommitAction;
     private int committed = 0;
 
@@ -59,6 +68,52 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
         return restoringByPartition.get(partition);
     }
 
+    void updateRestored(final Collection<TopicPartition> restored) {
+        if (restored.isEmpty()) {
+            return;
+        }
+        log.trace("Stream task changelog partitions that have completed restoring so far:
{}", restored);
+        restoredPartitions.addAll(restored);
+        for (final Iterator<Map.Entry<TaskId, StreamTask>> it = restoring.entrySet().iterator();
it.hasNext(); ) {
+            final Map.Entry<TaskId, StreamTask> entry = it.next();
+            final StreamTask task = entry.getValue();
+            if (restoredPartitions.containsAll(task.changelogPartitions())) {
+                transitionToRunning(task);
+                it.remove();
+                log.trace("Stream task {} completed restoration as all its changelog partitions
{} have been applied to restore state",
+                    task.id(),
+                    task.changelogPartitions());
+            } else {
+                if (log.isTraceEnabled()) {
+                    final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
+                    outstandingPartitions.removeAll(restoredPartitions);
+                    log.trace("Stream task {} cannot resume processing yet since some of
its changelog partitions have not completed restoring: {}",
+                        task.id(),
+                        outstandingPartitions);
+                }
+            }
+        }
+        if (allTasksRunning()) {
+            restoredPartitions.clear();
+        }
+    }
+
+    void addToRestoring(final StreamTask task) {
+        restoring.put(task.id(), task);
+        for (final TopicPartition topicPartition : task.partitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
+            restoringByPartition.put(topicPartition, task);
+        }
+    }
+
+    boolean allTasksRunning() {
+        return created.isEmpty()
+            && suspended.isEmpty()
+            && restoring.isEmpty();
+    }
+
     /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
@@ -139,4 +194,43 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
         return punctuated;
     }
 
+    RuntimeException suspend() {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(super.suspend());
+        log.trace("Close restoring stream task {}", restoring.keySet());
+        firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
+        restoring.clear();
+        restoringByPartition.clear();
+        return firstException.get();
+    }
+
+    public String toString(final String indent) {
+        final StringBuilder builder = new StringBuilder();
+        builder.append(super.toString(indent));
+        describe(builder, restoring.values(), indent, "Restoring:");
+        return builder.toString();
+    }
+
+    List<StreamTask> allTasks() {
+        final List<StreamTask> tasks = super.allTasks();
+        tasks.addAll(restoring.values());
+        return tasks;
+    }
+
+    Collection<StreamTask> restoringTasks() {
+        return Collections.unmodifiableCollection(restoring.values());
+    }
+
+    Set<TaskId> allAssignedTaskIds() {
+        final Set<TaskId> taskIds = super.allAssignedTaskIds();
+        taskIds.addAll(restoring.keySet());
+        return taskIds;
+    }
+
+    void clear() {
+        super.clear();
+        restoring.clear();
+        restoringByPartition.clear();
+        restoredPartitions.clear();
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index da402bc..52b1335 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -40,15 +40,12 @@ abstract class AssignedTasks<T extends Task> {
     private final Logger log;
     private final String taskTypeName;
     private final TaskAction<T> commitAction;
-    private Map<TaskId, T> created = new HashMap<>();
-    private Map<TaskId, T> suspended = new HashMap<>();
-    private Map<TaskId, T> restoring = new HashMap<>();
-    private Set<TopicPartition> restoredPartitions = new HashSet<>();
-    private Set<TaskId> previousActiveTasks = new HashSet<>();
+    Map<TaskId, T> created = new HashMap<>();
+    Map<TaskId, T> suspended = new HashMap<>();
+    private final Set<TaskId> previousActiveTasks = new HashSet<>();
     // IQ may access this map.
     Map<TaskId, T> running = new ConcurrentHashMap<>();
-    private Map<TopicPartition, T> runningByPartition = new HashMap<>();
-    Map<TopicPartition, T> restoringByPartition = new HashMap<>();
+    private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
 
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
@@ -99,42 +96,11 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
-    void updateRestored(final Collection<TopicPartition> restored) {
-        if (restored.isEmpty()) {
-            return;
-        }
-        log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName,
restored);
-        restoredPartitions.addAll(restored);
-        for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator();
it.hasNext(); ) {
-            final Map.Entry<TaskId, T> entry = it.next();
-            final T task = entry.getValue();
-            if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task);
-                it.remove();
-                log.trace("{} {} completed restoration as all its changelog partitions {}
have been applied to restore state",
-                        taskTypeName,
-                        task.id(),
-                        task.changelogPartitions());
-            } else {
-                if (log.isTraceEnabled()) {
-                    final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
-                    outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("{} {} cannot resume processing yet since some of its changelog
partitions have not completed restoring: {}",
-                              taskTypeName,
-                              task.id(),
-                              outstandingPartitions);
-                }
-            }
-        }
-        if (allTasksRunning()) {
-            restoredPartitions.clear();
-        }
-    }
+    abstract void addToRestoring(final T task);
 
     boolean allTasksRunning() {
         return created.isEmpty()
-                && suspended.isEmpty()
-                && restoring.isEmpty();
+                && suspended.isEmpty();
     }
 
     Collection<T> running() {
@@ -145,21 +111,17 @@ abstract class AssignedTasks<T extends Task> {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
         firstException.compareAndSet(null, suspendTasks(running.values()));
-        log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
-        firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
         log.trace("Close created {} {}", taskTypeName, created.keySet());
         firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
         previousActiveTasks.clear();
         previousActiveTasks.addAll(running.keySet());
         running.clear();
-        restoring.clear();
         created.clear();
         runningByPartition.clear();
-        restoringByPartition.clear();
         return firstException.get();
     }
 
-    private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
+    RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
         RuntimeException exception = null;
         for (final T task : tasks) {
             try {
@@ -176,7 +138,7 @@ abstract class AssignedTasks<T extends Task> {
 
     private RuntimeException suspendTasks(final Collection<T> tasks) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 task.suspend();
@@ -247,27 +209,17 @@ abstract class AssignedTasks<T extends Task> {
         return false;
     }
 
-    private void addToRestoring(final T task) {
-        restoring.put(task.id(), task);
-        for (TopicPartition topicPartition : task.partitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
-            restoringByPartition.put(topicPartition, task);
-        }
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void transitionToRunning(final T task) {
+    void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
-        for (TopicPartition topicPartition : task.partitions()) {
+        for (final TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
         }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
             runningByPartition.put(topicPartition, task);
         }
     }
@@ -288,15 +240,14 @@ abstract class AssignedTasks<T extends Task> {
         final StringBuilder builder = new StringBuilder();
         describe(builder, running.values(), indent, "Running:");
         describe(builder, suspended.values(), indent, "Suspended:");
-        describe(builder, restoring.values(), indent, "Restoring:");
         describe(builder, created.values(), indent, "New:");
         return builder.toString();
     }
 
-    private void describe(final StringBuilder builder,
-                          final Collection<T> tasks,
-                          final String indent,
-                          final String name) {
+    void describe(final StringBuilder builder,
+                  final Collection<T> tasks,
+                  final String indent,
+                  final String name) {
         builder.append(indent).append(name);
         for (final T t : tasks) {
             builder.append(indent).append(t.toString(indent + "\t\t"));
@@ -304,35 +255,27 @@ abstract class AssignedTasks<T extends Task> {
         builder.append("\n");
     }
 
-    private List<T> allTasks() {
+    List<T> allTasks() {
         final List<T> tasks = new ArrayList<>();
         tasks.addAll(running.values());
         tasks.addAll(suspended.values());
-        tasks.addAll(restoring.values());
         tasks.addAll(created.values());
         return tasks;
     }
 
-    Collection<T> restoringTasks() {
-        return Collections.unmodifiableCollection(restoring.values());
-    }
-
     Set<TaskId> allAssignedTaskIds() {
         final Set<TaskId> taskIds = new HashSet<>();
         taskIds.addAll(running.keySet());
         taskIds.addAll(suspended.keySet());
-        taskIds.addAll(restoring.keySet());
         taskIds.addAll(created.keySet());
         return taskIds;
     }
 
     void clear() {
         runningByPartition.clear();
-        restoringByPartition.clear();
         running.clear();
         created.clear();
         suspended.clear();
-        restoredPartitions.clear();
     }
 
     Set<TaskId> previousTaskIds() {
@@ -351,7 +294,7 @@ abstract class AssignedTasks<T extends Task> {
     void applyToRunningTasks(final TaskAction<T> action) {
         RuntimeException firstException = null;
 
-        for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 action.apply(task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e..e0bac93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@ public class StateRestorer {
 
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
     private final String storeName;
     private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
     private long endingOffset;
@@ -45,7 +45,7 @@ public class StateRestorer {
                   final String storeName) {
         this.partition = partition;
         this.compositeRestoreListener = compositeRestoreListener;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
         this.storeName = storeName;
@@ -56,7 +56,15 @@ public class StateRestorer {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
+    }
+
+    public String storeName() {
+        return storeName;
     }
 
     void restoreStarted() {
@@ -67,7 +75,8 @@ public class StateRestorer {
         compositeRestoreListener.onRestoreEnd(partition, storeName, restoredNumRecords());
     }
 
-    void restoreBatchCompleted(long currentRestoredOffset, int numRestored) {
+    void restoreBatchCompleted(final long currentRestoredOffset,
+                               final int numRestored) {
         compositeRestoreListener.onBatchRestored(partition, storeName, currentRestoredOffset,
numRestored);
     }
 
@@ -79,7 +88,7 @@ public class StateRestorer {
         return persistent;
     }
 
-    void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+    void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
         this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76..e0fc82d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -61,9 +61,14 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setUserRestoreListener(userStateRestoreListener);
-        stateRestorers.put(restorer.partition(), restorer);
-        needsInitializing.put(restorer.partition(), restorer);
+        final StateRestorer existingRestorer = stateRestorers.get(restorer.partition());
+        if (existingRestorer == null) {
+            restorer.setUserRestoreListener(userStateRestoreListener);
+            stateRestorers.put(restorer.partition(), restorer);
+            needsInitializing.put(restorer.partition(), restorer);
+        } else {
+            needsInitializing.put(restorer.partition(), existingRestorer);
+        }
     }
 
     /**
@@ -71,7 +76,7 @@ public class StoreChangelogReader implements ChangelogReader {
      */
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(active);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -103,7 +108,7 @@ public class StoreChangelogReader implements ChangelogReader {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final RestoringTasks active) {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new StreamsException("Restore consumer should not be subscribed to any
topics (" + restoreConsumer.subscription() + ")");
         }
@@ -112,8 +117,8 @@ public class StoreChangelogReader implements ChangelogReader {
         // the needsInitializing map is not empty, meaning we do not know the metadata for
some of them yet
         refreshChangelogInfo();
 
-        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
+        final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
+        for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
             final TopicPartition topicPartition = entry.getKey();
             if (hasPartition(topicPartition)) {
                 initializable.put(entry.getKey(), entry.getValue());
@@ -157,11 +162,12 @@ public class StoreChangelogReader implements ChangelogReader {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, active);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized)
{
+    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+                                  final RestoringTasks active) {
         log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
@@ -170,26 +176,42 @@ public class StoreChangelogReader implements ChangelogReader {
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
+            final TopicPartition restoringPartition = restorer.partition();
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
+                restoreConsumer.seek(restoringPartition, restorer.checkpoint());
+                logRestoreOffsets(restoringPartition,
                                   restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
-                restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+                                  endOffsets.get(restoringPartition));
+                restorer.setStartingOffset(restoreConsumer.position(restoringPartition));
                 restorer.restoreStarted();
             } else {
-                restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                restoreConsumer.seekToBeginning(Collections.singletonList(restoringPartition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = restoreConsumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
-            restorer.restoreStarted();
+            final TopicPartition restoringPartition = restorer.partition();
+            final StreamTask task = active.restoringTaskFor(restoringPartition);
+            // If checkpoint does not exist it means the task was not shutdown gracefully
before;
+            // and in this case if EOS is turned on we should wipe out the state and re-initialize
the task
+            if (task.eosEnabled) {
+                log.info("No checkpoint found for task {} state store {} changelog {} with
EOS turned on. " +
+                    "Reinitializing the task and restore its state from the beginning.",
task.id, restorer.storeName(), restoringPartition);
+                // we move the partitions here, because they will be added back within
+                // `task.reinitializeStateStoresForPartitions()` that calls `register()`
internally again
+                needsInitializing.remove(restoringPartition);
+                restorer.setCheckpointOffset(restoreConsumer.position(restoringPartition));
+                task.reinitializeStateStoresForPartitions(Collections.singleton(restoringPartition));
+            } else {
+                log.info("Restoring task {}'s state store {} from beginning of the changelog
{} ", task.id, restorer.storeName(), restoringPartition);
+                final long position = restoreConsumer.position(restoringPartition);
+                logRestoreOffsets(restoringPartition,
+                    position,
+                    endOffsets.get(restoringPartition));
+                restorer.setStartingOffset(position);
+                restorer.restoreStarted();
+            }
         }
 
         needsRestoring.putAll(initialized);
@@ -280,7 +302,7 @@ public class StoreChangelogReader implements ChangelogReader {
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
         long nextPosition = -1;
-        int numberRecords = records.size();
+        final int numberRecords = records.size();
         int numberRestored = 0;
         long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b4..f6811c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -317,7 +317,7 @@ public class EosIntegrationTest {
         // the app is supposed to copy all 40 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted
writes
         //
-        // the failure gets inject after 20 committed and 30 uncommitted records got received
+        // the failure gets inject after 20 committed and 10 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 record got written)
 
@@ -392,7 +392,7 @@ public class EosIntegrationTest {
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted
writes
         // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
         //
-        // the failure gets inject after 20 committed and 30 uncommitted records got received
+        // the failure gets inject after 20 committed and 10 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state stores should
contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public class EosIntegrationTest {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L,
10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4ef..a117dc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -100,7 +100,10 @@ public class StoreChangelogReaderTest {
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
         mockRestorer.setUserRestoreListener(stateRestoreListener);
-        expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new
TopicPartition("sometopic", 0));
+        expect(mockRestorer.partition())
+            .andReturn(new TopicPartition("sometopic", 0))
+            .andReturn(new TopicPartition("sometopic", 0))
+            .andReturn(new TopicPartition("sometopic", 0));
         EasyMock.replay(mockRestorer);
         changelogReader.register(mockRestorer);
 
@@ -119,6 +122,10 @@ public class StoreChangelogReaderTest {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -133,10 +140,9 @@ public class StoreChangelogReaderTest {
                 return Collections.singleton(topicPartition);
             }
         });
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName"));
 
-        EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task).andReturn(task);
         EasyMock.replay(active);
 
         // first restore call "fails" but we should not die with an exception
@@ -165,6 +171,9 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
                                                    "storeName"));
 
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
     }
@@ -175,6 +184,10 @@ public class StoreChangelogReaderTest {
         final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener,
null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -197,8 +210,9 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE,
true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE,
true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
         changelogReader.restore(active);
 
@@ -224,8 +238,9 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE,
true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE,
true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
         changelogReader.restore(active);
 
@@ -246,8 +261,11 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
5, true, "storeName1"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -306,6 +324,10 @@ public class StoreChangelogReaderTest {
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -315,6 +337,10 @@ public class StoreChangelogReaderTest {
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -328,8 +354,11 @@ public class StoreChangelogReaderTest {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes,
bytes), KeyValue.pair(bytes, bytes))));
@@ -354,9 +383,10 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 0);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(postInitialization)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
 
         assertTrue(changelogReader.restore(active).isEmpty());


Mime
View raw message