kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [3/3] kafka git commit: KAFKA-5152; move state restoration out of rebalance and into poll loop
Date Wed, 16 Aug 2017 10:14:14 GMT
KAFKA-5152; move state restoration out of rebalance and into poll loop

In `onPartitionsAssigned`:
1. release all locks for non-assigned suspended tasks.
2. resume any suspended tasks.
3. Create new tasks, but don't attempt to take the state lock.
4. Pause partitions for any new tasks.
5. set the state to `PARTITIONS_ASSIGNED`

In `StreamThread#runLoop`
1. poll
2. if state is `PARTITIONS_ASSIGNED`
 2.1  attempt to initialize any new tasks, i.e, take out the state locks and init state stores
 2.2 restore some data for changelogs, i.e., poll once on the restore consumer and return the partitions that have been fully restored
 2.3 update tasks with restored partitions and move any that have completed restoration to running
 2.4 resume consumption for any tasks where all partitions have been restored.
 2.5 if all active tasks are running, transition to `RUNNING` and assign standby partitions to the restoreConsumer.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>

Closes #3653 from dguy/0.11.0-restore-on-poll


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b268322e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b268322e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b268322e

Branch: refs/heads/0.11.0
Commit: b268322ed789736dd1199849e3d3ff100ddc270e
Parents: 22c9a8c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Aug 16 11:14:00 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Aug 16 11:14:00 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |   6 +-
 .../processor/internals/AbstractTask.java       |  63 +-
 .../processor/internals/AssignedTasks.java      | 428 +++++++++++
 .../processor/internals/ChangelogReader.java    |   6 +-
 .../internals/ProcessorStateManager.java        |  66 +-
 .../processor/internals/StandbyTask.java        |  37 +-
 .../internals/StoreChangelogReader.java         | 152 ++--
 .../streams/processor/internals/StreamTask.java |  32 +-
 .../processor/internals/StreamThread.java       | 724 ++++++-------------
 .../apache/kafka/streams/KafkaStreamsTest.java  |   2 +-
 .../integration/ResetIntegrationTest.java       |   8 +-
 .../processor/internals/AbstractTaskTest.java   |  82 ++-
 .../processor/internals/AssignedTasksTest.java  | 426 +++++++++++
 .../internals/ProcessorStateManagerTest.java    |  32 +-
 .../processor/internals/StandbyTaskTest.java    |   7 +-
 .../internals/StoreChangelogReaderTest.java     |  21 +
 .../processor/internals/StreamTaskTest.java     |  16 +-
 .../processor/internals/StreamThreadTest.java   | 414 ++++++-----
 .../StreamThreadStateStoreProviderTest.java     |   2 +
 .../apache/kafka/test/MockChangelogReader.java  |  10 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   1 +
 21 files changed, 1628 insertions(+), 907 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6056fa6..846348f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -205,9 +205,7 @@ public class KafkaStreams {
         public boolean isRunning() {
             return equals(RUNNING) || equals(REBALANCING);
         }
-        public boolean isCreatedOrRunning() {
-            return isRunning() || equals(CREATED);
-        }
+
         public boolean isValidTransition(final State newState) {
             return validTransitions.contains(newState.ordinal());
         }
@@ -380,7 +378,7 @@ public class KafkaStreams {
                 threadState.put(thread.getId(), newState);
 
                 if (newState == StreamThread.State.PARTITIONS_REVOKED ||
-                        newState == StreamThread.State.ASSIGNING_PARTITIONS) {
+                        newState == StreamThread.State.PARTITIONS_ASSIGNED) {
                     setState(State.REBALANCING);
                 } else if (newState == StreamThread.State.RUNNING && state() != State.RUNNING) {
                     maybeSetRunning();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8427e11..b8a585a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -23,7 +23,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -39,6 +41,7 @@ import java.util.Set;
 
 public abstract class AbstractTask {
     private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
+    private static final int STATE_DIR_LOCK_RETRIES = 5;
 
     final TaskId id;
     final String applicationId;
@@ -48,6 +51,8 @@ public abstract class AbstractTask {
     final Consumer consumer;
     final String logPrefix;
     final boolean eosEnabled;
+    private final StateDirectory stateDirectory;
+    boolean taskInitialized;
 
     InternalProcessorContext processorContext;
 
@@ -69,6 +74,7 @@ public abstract class AbstractTask {
         this.topology = topology;
         this.consumer = consumer;
         this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
+        this.stateDirectory = stateDirectory;
 
         logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
 
@@ -93,7 +99,7 @@ public abstract class AbstractTask {
     public abstract void suspend();
     public abstract void close(final boolean clean);
 
-    public final TaskId id() {
+    public TaskId id() {
         return id;
     }
 
@@ -101,7 +107,7 @@ public abstract class AbstractTask {
         return applicationId;
     }
 
-    public final Set<TopicPartition> partitions() {
+    public Set<TopicPartition> partitions() {
         return partitions;
     }
 
@@ -188,6 +194,21 @@ public abstract class AbstractTask {
     }
 
     void initializeStateStores() {
+        if (topology.stateStores().isEmpty()) {
+            return;
+        }
+
+        try {
+            if (!stateDirectory.lock(id, STATE_DIR_LOCK_RETRIES)) {
+                throw new LockException(String.format("%s Failed to lock the state directory for task %s",
+                                                      logPrefix, id));
+            }
+        } catch (IOException e) {
+            throw new StreamsException(String.format("%s Fatal error while trying to lock the state directory for task %s %s",
+                                                     logPrefix,
+                                                     id,
+                                                     e.getMessage()));
+        }
         log.trace("{} Initializing state stores", logPrefix);
 
         // set initial offset limits
@@ -204,8 +225,44 @@ public abstract class AbstractTask {
      * @param writeCheckpoint boolean indicating if a checkpoint file should be written
      */
     void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
+        ProcessorStateException exception = null;
         log.trace("{} Closing state manager", logPrefix);
-        stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+        try {
+            stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+        } catch (final ProcessorStateException e) {
+            exception = e;
+        } finally {
+            try {
+                stateDirectory.unlock(id);
+            } catch (IOException e) {
+                if (exception == null) {
+                    exception = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e);
+                }
+                log.error("{} Failed to release state dir lock: ", logPrefix, e);
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    /**
+     * initialize the topology/state stores
+     * @return true if the topology is ready to run, i.e, all stores have been restored.
+     */
+    public abstract boolean initialize();
+
+    abstract boolean process();
+
+    boolean hasStateStores() {
+        return !topology.stateStores().isEmpty();
     }
 
+    Collection<TopicPartition> changelogPartitions() {
+        return stateMgr.changelogPartitions();
+    }
+
+    abstract boolean maybePunctuate();
+
+    abstract boolean commitNeeded();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
new file mode 100644
index 0000000..6db212f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class AssignedTasks<T extends AbstractTask> {
+    private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class);
+    private final String logPrefix;
+    private final String taskTypeName;
+    private final Time time;
+    private Map<TaskId, T> created = new HashMap<>();
+    private Map<TaskId, T> suspended = new HashMap<>();
+    private Map<TaskId, T> restoring = new HashMap<>();
+    private Set<TopicPartition> restoredPartitions = new HashSet<>();
+    private Set<TaskId> previousActiveTasks = new HashSet<>();
+    // IQ may access this map.
+    private Map<TaskId, T> running = new ConcurrentHashMap<>();
+    private Map<TopicPartition, T> runningByPartition = new HashMap<>();
+
+    AssignedTasks(final String logPrefix,
+                  final String taskTypeName,
+                  final Time time) {
+        this.logPrefix = logPrefix;
+        this.taskTypeName = taskTypeName;
+        this.time = time;
+    }
+
+    void addNewTask(final T task) {
+        log.trace("{} add new {} {}", logPrefix, taskTypeName, task.id());
+        created.put(task.id(), task);
+    }
+
+    Set<TopicPartition> uninitializedPartitions() {
+        if (created.isEmpty()) {
+            return Collections.emptySet();
+        }
+        final Set<TopicPartition> partitions = new HashSet<>();
+        for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
+            if (entry.getValue().hasStateStores()) {
+                partitions.addAll(entry.getValue().partitions());
+            }
+        }
+        return partitions;
+    }
+
+    void initializeNewTasks() {
+        if (!created.isEmpty()) {
+            log.trace("{} Initializing {}s {}", logPrefix, taskTypeName, created.keySet());
+        }
+        for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, T> entry = it.next();
+            try {
+                if (!entry.getValue().initialize()) {
+                    restoring.put(entry.getKey(), entry.getValue());
+                } else {
+                    transitionToRunning(entry.getValue());
+                }
+                it.remove();
+            } catch (final LockException e) {
+                // made this trace as it will spam the logs in the poll loop.
+                log.trace("{} Could not create {} {} due to {}; will retry", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
+            }
+        }
+    }
+
+    Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored) {
+        if (restored.isEmpty()) {
+            return Collections.emptySet();
+        }
+        final Set<TopicPartition> resume = new HashSet<>();
+        restoredPartitions.addAll(restored);
+        for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
+            final Map.Entry<TaskId, T> entry = it.next();
+            T task = entry.getValue();
+            if (restoredPartitions.containsAll(task.changelogPartitions())) {
+                transitionToRunning(task);
+                resume.addAll(task.partitions());
+                it.remove();
+            }
+        }
+        if (allTasksRunning()) {
+            restoredPartitions.clear();
+        }
+        return resume;
+    }
+
+    boolean allTasksRunning() {
+        return created.isEmpty()
+                && suspended.isEmpty()
+                && restoring.isEmpty();
+    }
+
+    Collection<T> runningTasks() {
+        return running.values();
+    }
+
+    RuntimeException suspend() {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+        log.trace("{} Suspending running {} {}", logPrefix, taskTypeName, runningTaskIds());
+        firstException.compareAndSet(null, suspendTasks(running.values()));
+        log.trace("{} Close restoring {} {}", logPrefix, taskTypeName, restoring.keySet());
+        firstException.compareAndSet(null, closeTasksUnclean(restoring.values()));
+        firstException.compareAndSet(null, closeTasksUnclean(created.values()));
+        previousActiveTasks.clear();
+        previousActiveTasks.addAll(running.keySet());
+        running.clear();
+        restoring.clear();
+        created.clear();
+        runningByPartition.clear();
+        return firstException.get();
+    }
+
+    private RuntimeException closeTasksUnclean(final Collection<T> tasks) {
+        RuntimeException exception = null;
+        for (final T task : tasks) {
+            try {
+                task.close(false);
+            } catch (final RuntimeException e) {
+                log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id, e);
+                if (exception == null) {
+                    exception = e;
+                }
+            }
+        }
+        return exception;
+    }
+
+    private RuntimeException suspendTasks(final Collection<T> tasks) {
+        RuntimeException exception = null;
+        for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+            final T task = it.next();
+            try {
+                task.suspend();
+                suspended.put(task.id(), task);
+            } catch (final CommitFailedException e) {
+                suspended.put(task.id(), task);
+                // commit failed during suspension. Just log it.
+                log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", logPrefix, taskTypeName, task.id);
+            } catch (final ProducerFencedException e) {
+                closeZombieTask(task);
+                it.remove();
+            } catch (final RuntimeException e) {
+                log.error("{} Suspending {} {} failed due to the following error:", logPrefix, taskTypeName, task.id, e);
+                try {
+                    task.close(false);
+                } catch (final Exception f) {
+                    log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", logPrefix, taskTypeName, task.id, f);
+                }
+                if (exception == null) {
+                    exception = e;
+                }
+            }
+        }
+        return exception;
+    }
+
+    private void closeZombieTask(final T task) {
+        log.warn("{} Producer of {} {} fenced; closing zombie task", logPrefix, taskTypeName, task.id);
+        try {
+            task.close(false);
+        } catch (final Exception e) {
+            log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName, logPrefix, e);
+        }
+    }
+
+    boolean hasRunningTasks() {
+        return !running.isEmpty();
+    }
+
+    boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
+        if (suspended.containsKey(taskId)) {
+            final T task = suspended.get(taskId);
+            if (task.partitions().equals(partitions)) {
+                suspended.remove(taskId);
+                log.trace("{} Resuming suspended {} {}", logPrefix, taskTypeName, taskId);
+                task.resume();
+                transitionToRunning(task);
+                return true;
+            } else {
+                log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", logPrefix, taskId, partitions, task.partitions);
+            }
+        }
+        return false;
+    }
+
+    private void transitionToRunning(final T task) {
+        running.put(task.id(), task);
+        for (TopicPartition topicPartition : task.partitions()) {
+            runningByPartition.put(topicPartition, task);
+        }
+        for (TopicPartition topicPartition : task.changelogPartitions()) {
+            runningByPartition.put(topicPartition, task);
+        }
+    }
+
+    T runningTaskFor(final TopicPartition partition) {
+        return runningByPartition.get(partition);
+    }
+
+    Set<TaskId> runningTaskIds() {
+        return running.keySet();
+    }
+
+    Map<TaskId, T> runningTaskMap() {
+        return Collections.unmodifiableMap(running);
+    }
+
+    public String toString(final String indent) {
+        final StringBuilder builder = new StringBuilder();
+        describe(builder, running.values(), indent, "Running:");
+        describe(builder, suspended.values(), indent, "Suspended:");
+        describe(builder, restoring.values(), indent, "Restoring:");
+        describe(builder, created.values(), indent, "New:");
+        return builder.toString();
+    }
+
+    private void describe(final StringBuilder builder,
+                          final Collection<T> tasks,
+                          final String indent,
+                          final String name) {
+        builder.append(indent).append(name);
+        for (final T t : tasks) {
+            builder.append(indent).append(t.toString(indent + "\t\t"));
+        }
+        builder.append("\n");
+    }
+
+    List<AbstractTask> allInitializedTasks() {
+        final List<AbstractTask> tasks = new ArrayList<>();
+        tasks.addAll(running.values());
+        tasks.addAll(suspended.values());
+        tasks.addAll(restoring.values());
+        return tasks;
+    }
+
+    Collection<T> suspendedTasks() {
+        return suspended.values();
+    }
+
+    Collection<T> restoringTasks() {
+        return Collections.unmodifiableCollection(restoring.values());
+    }
+
+    Collection<TaskId> allAssignedTaskIds() {
+        final List<TaskId> taskIds = new ArrayList<>();
+        taskIds.addAll(running.keySet());
+        taskIds.addAll(suspended.keySet());
+        taskIds.addAll(restoring.keySet());
+        taskIds.addAll(created.keySet());
+        return taskIds;
+    }
+
+    void clear() {
+        runningByPartition.clear();
+        running.clear();
+        created.clear();
+        suspended.clear();
+        restoredPartitions.clear();
+    }
+
+    Set<TaskId> previousTaskIds() {
+        return previousActiveTasks;
+    }
+
+    void commit() {
+        final RuntimeException exception = applyToRunningTasks(new TaskAction<T>() {
+            @Override
+            public String name() {
+                return "commit";
+            }
+
+            @Override
+            public void apply(final T task) {
+                task.commit();
+            }
+        }, false);
+
+        if (exception != null) {
+            throw exception;
+        }
+
+    }
+
+    int process() {
+        final AtomicInteger processed = new AtomicInteger(0);
+        applyToRunningTasks(new TaskAction<T>() {
+            @Override
+            public String name() {
+                return "process";
+            }
+
+            @Override
+            public void apply(final T task) {
+                if (task.process()) {
+                    processed.incrementAndGet();
+                }
+            }
+        }, true);
+        return processed.get();
+    }
+
+    void punctuateAndCommit(final Sensor commitTimeSensor, final Sensor punctuateTimeSensor) {
+        final Latency latency = new Latency(time.milliseconds());
+        final RuntimeException exception = applyToRunningTasks(new TaskAction<T>() {
+            String name;
+
+            @Override
+            public String name() {
+                return name;
+            }
+
+            @Override
+            public void apply(final T task) {
+                name = "punctuate";
+                if (task.maybePunctuate()) {
+                    punctuateTimeSensor.record(latency.compute(), latency.startTime);
+                }
+                if (task.commitNeeded()) {
+                    name = "commit";
+                    long beforeCommitMs = time.milliseconds();
+                    task.commit();
+                    commitTimeSensor.record(latency.compute(), latency.startTime);
+                    if (log.isDebugEnabled()) {
+                        log.debug("{} Committed active task {} per user request in {} ms",
+                                  logPrefix, task.id(), latency.startTime - beforeCommitMs);
+                    }
+                }
+            }
+        }, false);
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    Collection<TaskId> suspendedTaskIds() {
+        return suspended.keySet();
+    }
+
+    interface TaskAction<T extends AbstractTask> {
+        String name();
+
+        void apply(final T task);
+    }
+
+    class Latency {
+        private long startTime;
+
+        Latency(final long startTime) {
+            this.startTime = startTime;
+        }
+
+        private long compute() {
+            final long previousTimeMs = startTime;
+            startTime = time.milliseconds();
+            return Math.max(startTime - previousTimeMs, 0);
+        }
+    }
+
+
+    private RuntimeException applyToRunningTasks(final TaskAction<T> action, final boolean throwException) {
+        RuntimeException firstException = null;
+
+        for (Iterator<T> it = runningTasks().iterator(); it.hasNext(); ) {
+            final T task = it.next();
+            try {
+                action.apply(task);
+            } catch (final CommitFailedException e) {
+                // commit failed. This is already logged inside the task as WARN and we can just log it again here.
+                log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", logPrefix, taskTypeName, task.id(), action.name());
+            } catch (final ProducerFencedException e) {
+                closeZombieTask(task);
+                it.remove();
+            } catch (final RuntimeException t) {
+                log.error("{} Failed to {} {} {} due to the following error:",
+                                       logPrefix,
+                                       action.name(),
+                                       taskTypeName,
+                                       task.id(),
+                                       t);
+                if (throwException) {
+                    throw t;
+                }
+                if (firstException == null) {
+                    firstException = t;
+                }
+            }
+        }
+
+        return firstException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 2e006a0..d8ed35e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -42,11 +43,14 @@ public interface ChangelogReader {
 
     /**
      * Restore all registered state stores by reading from their changelogs.
+     * @return all topic partitions that have been restored
      */
-    void restore();
+    Collection<TopicPartition> restore();
 
     /**
      * @return the restored offsets for all persistent stores.
      */
     Map<TopicPartition, Long> restoredOffsets();
+
+    void reset();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 226d7eb..110b03d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -49,7 +49,6 @@ public class ProcessorStateManager implements StateManager {
     private final TaskId taskId;
     private final String logPrefix;
     private final boolean isStandby;
-    private final StateDirectory stateDirectory;
     private final ChangelogReader changelogReader;
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
@@ -58,7 +57,7 @@ public class ProcessorStateManager implements StateManager {
     private final Map<TopicPartition, Long> checkpointedOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
-    private final boolean eosEnabled;
+    private final List<TopicPartition> changelogPartitions = new ArrayList<>();
 
     // TODO: this map does not work with customized grouper where multiple partitions
     // of the same topic can be assigned to the same topic.
@@ -76,9 +75,8 @@ public class ProcessorStateManager implements StateManager {
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> storeToChangelogTopic,
                                  final ChangelogReader changelogReader,
-                                 final boolean eosEnabled) throws LockException, IOException {
+                                 final boolean eosEnabled) throws IOException {
         this.taskId = taskId;
-        this.stateDirectory = stateDirectory;
         this.changelogReader = changelogReader;
         logPrefix = String.format("task [%s]", taskId);
 
@@ -93,12 +91,7 @@ public class ProcessorStateManager implements StateManager {
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
-        this.eosEnabled = eosEnabled;
 
-        if (!stateDirectory.lock(taskId, 5)) {
-            throw new LockException(String.format("%s Failed to lock the state directory for task %s",
-                logPrefix, taskId));
-        }
         // get a handle on the parent/base directory of the task directory
         // note that the parent directory could have been accidentally deleted here,
         // so catch that exception if that is the case
@@ -170,16 +163,19 @@ public class ProcessorStateManager implements StateManager {
 
                 restoreCallbacks.put(topic, stateRestoreCallback);
             }
+
         } else {
             log.trace("{} Restoring state store {} from changelog topic {}", logPrefix, store.name(), topic);
             final StateRestorer restorer = new StateRestorer(storePartition,
                                                              stateRestoreCallback,
                                                              checkpointedOffsets.get(storePartition),
                                                              offsetLimit(storePartition),
-                                                             store.persistent());
+                                                             store.persistent()
+            );
+
             changelogReader.register(restorer);
         }
-
+        changelogPartitions.add(storePartition);
         stores.put(store.name(), store);
     }
 
@@ -273,38 +269,26 @@ public class ProcessorStateManager implements StateManager {
     @Override
     public void close(final Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException {
         RuntimeException firstException = null;
-        try {
-            // attempting to close the stores, just in case they
-            // are not closed by a ProcessorNode yet
-            if (!stores.isEmpty()) {
-                log.debug("{} Closing its state manager and all the registered state stores", logPrefix);
-                for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                    log.debug("{} Closing storage engine {}", logPrefix, entry.getKey());
-                    try {
-                        entry.getValue().close();
-                    } catch (final Exception e) {
-                        if (firstException == null) {
-                            firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", logPrefix, entry.getKey()), e);
-                        }
-                        log.error("{} Failed to close state store {}: ", logPrefix, entry.getKey(), e);
+        // attempting to close the stores, just in case they
+        // are not closed by a ProcessorNode yet
+        if (!stores.isEmpty()) {
+            log.debug("{} Closing its state manager and all the registered state stores", logPrefix);
+            for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
+                log.debug("{} Closing storage engine {}", logPrefix, entry.getKey());
+                try {
+                    entry.getValue().close();
+                } catch (final Exception e) {
+                    if (firstException == null) {
+                        firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", logPrefix, entry.getKey()), e);
                     }
+                    log.error("{} Failed to close state store {}: ", logPrefix, entry.getKey(), e);
                 }
-
-                if (ackedOffsets != null) {
-                    checkpoint(ackedOffsets);
-                }
-
             }
-        } finally {
-            // release the state directory directoryLock
-            try {
-                stateDirectory.unlock(taskId);
-            } catch (final IOException e) {
-                if (firstException == null) {
-                    firstException = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e);
-                }
-                log.error("{} Failed to release state dir lock: ", logPrefix, e);
+
+            if (ackedOffsets != null) {
+                checkpoint(ackedOffsets);
             }
+
         }
 
         if (firstException != null) {
@@ -359,4 +343,8 @@ public class ProcessorStateManager implements StateManager {
     public StateStore getGlobalStore(final String name) {
         return globalStores.get(name);
     }
+
+    Collection<TopicPartition> changelogPartitions() {
+        return changelogPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index df3bea4..bc36351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -36,7 +37,7 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
     private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
-    private final Map<TopicPartition, Long> checkpointedOffsets;
+    private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
@@ -63,11 +64,6 @@ public class StandbyTask extends AbstractTask {
 
         // initialize the topology with its own context
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
-
-        log.debug("{} Initializing", logPrefix);
-        initializeStateStores();
-        processorContext.initialized();
-        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
     }
 
     /**
@@ -77,7 +73,7 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void resume() {
-        log.debug("{} " + "Resuming", logPrefix);
+        log.debug("{} Resuming", logPrefix);
         updateOffsetLimits();
     }
 
@@ -123,6 +119,9 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void close(final boolean clean) {
+        if (!taskInitialized) {
+            return;
+        }
         log.debug("{} Closing", logPrefix);
         boolean committedSuccessfully = false;
         try {
@@ -148,4 +147,28 @@ public class StandbyTask extends AbstractTask {
         return checkpointedOffsets;
     }
 
+    public boolean initialize() {
+        log.debug("{} Initializing", logPrefix);
+        initializeStateStores();
+        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
+        processorContext.initialized();
+        taskInitialized = true;
+        return true;
+    }
+
+    @Override
+    boolean process() {
+        throw new UnsupportedOperationException("process not supported by standby task");
+    }
+
+    @Override
+    boolean maybePunctuate() {
+        throw new UnsupportedOperationException("maybePunctuate not supported by standby task");
+    }
+
+    @Override
+    boolean commitNeeded() {
+        throw new UnsupportedOperationException("commitNeeded not supported by standby task");
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 3d5a793..ea64619 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -28,10 +28,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,6 +45,9 @@ public class StoreChangelogReader implements ChangelogReader {
     private final long partitionValidationTimeoutMs;
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
+    private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
+    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+    private boolean initialized = false;
 
     public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
         this.time = time;
@@ -92,69 +95,80 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        if (restorer.offsetLimit() > 0) {
-            stateRestorers.put(restorer.partition(), restorer);
+        stateRestorers.put(restorer.partition(), restorer);
+    }
+
+
+    public Collection<TopicPartition> restore() {
+        if (!initialized) {
+            initialize();
+        }
+
+        if (needsRestoring.isEmpty()) {
+            consumer.assign(Collections.<TopicPartition>emptyList());
+            return completed();
+        }
+
+        final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
+        final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10);
+        for (final TopicPartition partition : partitions) {
+            restorePartition(allRecords, partition);
         }
+
+        if (needsRestoring.isEmpty()) {
+            consumer.assign(Collections.<TopicPartition>emptyList());
+        }
+        return completed();
     }
 
-    public void restore() {
-        final long start = time.milliseconds();
-        final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
-        try {
-            if (!consumer.subscription().isEmpty()) {
-                throw new IllegalStateException(String.format("Restore consumer should have not subscribed to any partitions (%s) beforehand", consumer.subscription()));
-            }
-            final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(stateRestorers.keySet());
-
-            // remove any partitions where we already have all of the data
-            for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
-                TopicPartition topicPartition = entry.getKey();
-                Long offset = entry.getValue();
-                final StateRestorer restorer = stateRestorers.get(topicPartition);
-                if (restorer.checkpoint() >= offset) {
-                    restorer.setRestoredOffset(restorer.checkpoint());
-                } else {
-                    needsRestoring.put(topicPartition, restorer);
-                }
-            }
+    private void initialize() {
+        needsRestoring.clear();
+        if (!consumer.subscription().isEmpty()) {
+            throw new IllegalStateException(String.format("Restore consumer should have not subscribed to any partitions (%s) beforehand", consumer.subscription()));
+        }
+        endOffsets.putAll(consumer.endOffsets(stateRestorers.keySet()));
 
-            log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix, needsRestoring.keySet());
-
-            consumer.assign(needsRestoring.keySet());
-            final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-            for (final StateRestorer restorer : needsRestoring.values()) {
-                if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                    consumer.seek(restorer.partition(), restorer.checkpoint());
-                    logRestoreOffsets(restorer.partition(),
-                                      restorer.checkpoint(),
-                                      endOffsets.get(restorer.partition()));
-                    restorer.setStartingOffset(consumer.position(restorer.partition()));
-                } else {
-                    consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
-                    needsPositionUpdate.add(restorer);
-                }
+        // remove any partitions where we already have all of the data
+        for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            Long offset = entry.getValue();
+            final StateRestorer restorer = stateRestorers.get(topicPartition);
+            if (restorer.checkpoint() >= offset) {
+                log.debug("{} not restoring partition {} as checkpoint {} is >= offset {}", logPrefix, topicPartition, restorer.checkpoint(), offset);
+                restorer.setRestoredOffset(restorer.checkpoint());
+            } else if (restorer.offsetLimit() == 0) {
+                log.debug("{} not restoring partition {} as offset limit is 0", logPrefix, topicPartition);
+                restorer.setRestoredOffset(0);
+            } else {
+                needsRestoring.put(topicPartition, restorer);
             }
+        }
+
+        log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix, needsRestoring.keySet());
 
-            for (final StateRestorer restorer : needsPositionUpdate) {
-                final long position = consumer.position(restorer.partition());
-                restorer.setStartingOffset(position);
+        consumer.assign(needsRestoring.keySet());
+        final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
+        for (final StateRestorer restorer : needsRestoring.values()) {
+            if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
+                consumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
-                                  position,
+                                  restorer.checkpoint(),
                                   endOffsets.get(restorer.partition()));
+                restorer.setStartingOffset(consumer.position(restorer.partition()));
+            } else {
+                consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                needsPositionUpdate.add(restorer);
             }
+        }
 
-            final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
-            while (!partitions.isEmpty()) {
-                final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10);
-                final Iterator<TopicPartition> partitionIterator = partitions.iterator();
-                while (partitionIterator.hasNext()) {
-                    restorePartition(endOffsets, allRecords, partitionIterator);
-                }
-            }
-        } finally {
-            consumer.assign(Collections.<TopicPartition>emptyList());
-            log.info("{} Completed restore all active states from changelog topics {} in {}ms ", logPrefix, needsRestoring.keySet(), time.milliseconds() - start);
+        for (final StateRestorer restorer : needsPositionUpdate) {
+            final long position = consumer.position(restorer.partition());
+            restorer.setStartingOffset(position);
+            logRestoreOffsets(restorer.partition(),
+                              position,
+                              endOffsets.get(restorer.partition()));
         }
+        initialized = true;
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
@@ -165,6 +179,12 @@ public class StoreChangelogReader implements ChangelogReader {
                   endOffset);
     }
 
+    private Collection<TopicPartition> completed() {
+        final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
+        completed.removeAll(needsRestoring.keySet());
+        return completed;
+    }
+
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
         final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();
@@ -177,13 +197,21 @@ public class StoreChangelogReader implements ChangelogReader {
         return restoredOffsets;
     }
 
-    private void restorePartition(final Map<TopicPartition, Long> endOffsets,
-                                  final ConsumerRecords<byte[], byte[]> allRecords,
-                                  final Iterator<TopicPartition> partitionIterator) {
-        final TopicPartition topicPartition = partitionIterator.next();
+    @Override
+    public void reset() {
+        partitionInfo.clear();
+        stateRestorers.clear();
+        needsRestoring.clear();
+        endOffsets.clear();
+        initialized = false;
+    }
+
+    private boolean restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
+                                    final TopicPartition topicPartition) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
+        restorer.setRestoredOffset(pos);
         if (restorer.hasCompleted(pos, endOffset)) {
             if (pos > endOffset + 1) {
                 throw new IllegalStateException(
@@ -193,8 +221,6 @@ public class StoreChangelogReader implements ChangelogReader {
                                       pos));
             }
 
-            restorer.setRestoredOffset(pos);
-
             log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
                     logPrefix,
                     topicPartition,
@@ -202,11 +228,15 @@ public class StoreChangelogReader implements ChangelogReader {
                     restorer.startingOffset(),
                     restorer.restoredOffset());
 
-            partitionIterator.remove();
+            needsRestoring.remove(topicPartition);
+            return true;
         }
+        return false;
     }
 
-    private long processNext(final List<ConsumerRecord<byte[], byte[]>> records, final StateRestorer restorer, final Long endOffset) {
+    private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
+                             final StateRestorer restorer,
+                             final Long endOffset) {
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2d7b494..c5c67bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -137,16 +137,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
         this.time = time;
-        log.debug("{} Initializing", logPrefix);
-        initializeStateStores();
+
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
         if (eosEnabled) {
             this.producer.initTransactions();
             this.producer.beginTransaction();
             transactionInFlight = true;
         }
-        initTopology();
-        processorContext.initialized();
     }
 
     /**
@@ -334,6 +331,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 processorContext.setCurrentNode(null);
             }
         }
+        taskInitialized = true;
     }
 
     /**
@@ -376,14 +374,16 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // close the processors
         // make sure close() is called for each node even when there is a RuntimeException
         RuntimeException exception = null;
-        for (final ProcessorNode node : topology.processors()) {
-            processorContext.setCurrentNode(node);
-            try {
-                node.close();
-            } catch (final RuntimeException e) {
-                exception = e;
-            } finally {
-                processorContext.setCurrentNode(null);
+        if (taskInitialized) {
+            for (final ProcessorNode node : topology.processors()) {
+                processorContext.setCurrentNode(node);
+                try {
+                    node.close();
+                } catch (final RuntimeException e) {
+                    exception = e;
+                } finally {
+                    processorContext.setCurrentNode(null);
+                }
             }
         }
 
@@ -552,4 +552,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return new RecordCollectorImpl(producer, id.toString());
     }
 
+    public boolean initialize() {
+        log.debug("{} Initializing", logPrefix);
+        initializeStateStores();
+        initTopology();
+        processorContext.initialized();
+        return topology.stateStores().isEmpty();
+    }
+
 }


Mime
View raw message