Repository: kafka
Updated Branches:
refs/heads/trunk 46e4d4013 -> ff7b0f5b4
HOTFIX: make sure to go through all shutdown steps
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes #928 from ymatsuda/shutdown
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff7b0f5b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff7b0f5b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff7b0f5b
Branch: refs/heads/trunk
Commit: ff7b0f5b467bdf553584fb253b00f460dfbe8943
Parents: 46e4d40
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Feb 22 13:16:06 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 22 13:16:06 2016 -0800
----------------------------------------------------------------------
.../internals/ProcessorStateManager.java | 70 ++++++++++----------
.../streams/processor/internals/StreamTask.java | 4 +-
.../processor/internals/StreamThread.java | 60 +++++++++--------
3 files changed, 71 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c3bd82a..d449d04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -298,45 +298,47 @@ public class ProcessorStateManager {
}
public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
- if (!stores.isEmpty()) {
- log.debug("Closing stores.");
- for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
- log.debug("Closing storage engine {}", entry.getKey());
- entry.getValue().flush();
- entry.getValue().close();
- }
+ try {
+ if (!stores.isEmpty()) {
+ log.debug("Closing stores.");
+ for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
+ log.debug("Closing storage engine {}", entry.getKey());
+ entry.getValue().flush();
+ entry.getValue().close();
+ }
- Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
- for (String storeName : stores.keySet()) {
- TopicPartition part;
- if (loggingEnabled.contains(storeName))
- part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
- else
- part = new TopicPartition(storeName, getPartition(storeName));
-
- // only checkpoint the offset to the offsets file if it is persistent;
- if (stores.get(storeName).persistent()) {
- Long offset = ackedOffsets.get(part);
-
- if (offset != null) {
- // store the last offset + 1 (the log position after restoration)
- checkpointOffsets.put(part, offset + 1);
- } else {
- // if no record was produced. we need to check the restored offset.
- offset = restoredOffsets.get(part);
- if (offset != null)
- checkpointOffsets.put(part, offset);
+ Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
+ for (String storeName : stores.keySet()) {
+ TopicPartition part;
+ if (loggingEnabled.contains(storeName))
+ part = new TopicPartition(storeChangelogTopic(jobId, storeName),
getPartition(storeName));
+ else
+ part = new TopicPartition(storeName, getPartition(storeName));
+
+ // only checkpoint the offset to the offsets file if it is persistent;
+ if (stores.get(storeName).persistent()) {
+ Long offset = ackedOffsets.get(part);
+
+ if (offset != null) {
+ // store the last offset + 1 (the log position after restoration)
+ checkpointOffsets.put(part, offset + 1);
+ } else {
+ // if no record was produced. we need to check the restored offset.
+ offset = restoredOffsets.get(part);
+ if (offset != null)
+ checkpointOffsets.put(part, offset);
+ }
}
}
- }
- // write the checkpoint file before closing, to indicate clean shutdown
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
- checkpoint.write(checkpointOffsets);
+ // write the checkpoint file before closing, to indicate clean shutdown
+ OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir,
CHECKPOINT_FILE_NAME));
+ checkpoint.write(checkpointOffsets);
+ }
+ } finally {
+ // release the state directory directoryLock
+ directoryLock.release();
}
-
- // release the state directory directoryLock
- directoryLock.release();
}
private int getPartition(String topic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ec3d011..4d66324 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -300,10 +300,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
}
+ super.close();
+
if (exception != null)
throw exception;
-
- super.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/ff7b0f5b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3fc9407..70e24d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -138,14 +138,15 @@ public class StreamThread extends Thread {
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
try {
commitAll();
- // TODO: right now upon partition revocation, we always remove all the tasks;
- // this behavior can be optimized to only remove affected tasks in the future
- removeStreamTasks();
- removeStandbyTasks();
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are
assigned
} catch (Throwable t) {
rebalanceException = t;
throw t;
+ } finally {
+ // TODO: right now upon partition revocation, we always remove all the tasks;
+ // this behavior can be optimized to only remove affected tasks in the future
+ removeStreamTasks();
+ removeStandbyTasks();
}
}
};
@@ -273,6 +274,8 @@ public class StreamThread extends Thread {
private void shutdown() {
log.info("Shutting down stream thread [" + this.getName() + "]");
+ // Exceptions should not prevent this call from going through all shutdown steps
+
try {
commitAll();
} catch (Throwable e) {
@@ -299,13 +302,8 @@ public class StreamThread extends Thread {
log.error("Failed to close restore consumer in thread [" + this.getName() + "]:
", e);
}
- // Exceptions should not prevent this call from going through all shutdown steps
- try {
- removeStreamTasks();
- removeStandbyTasks();
- } catch (Throwable e) {
- // already logged in removeStreamTasks() and removeStandbyTasks()
- }
+ removeStreamTasks();
+ removeStandbyTasks();
log.info("Stream thread shutdown complete [" + this.getName() + "]");
}
@@ -627,15 +625,19 @@ public class StreamThread extends Thread {
}
private void removeStreamTasks() {
- for (StreamTask task : activeTasks.values()) {
- closeOne(task);
- }
+ try {
+ for (StreamTask task : activeTasks.values()) {
+ closeOne(task);
+ }
+ prevTasks.clear();
+ prevTasks.addAll(activeTasks.keySet());
- prevTasks.clear();
- prevTasks.addAll(activeTasks.keySet());
+ activeTasks.clear();
+ activeTasksByPartition.clear();
- activeTasks.clear();
- activeTasksByPartition.clear();
+ } catch (Exception e) {
+ log.error("Failed to remove stream tasks in thread [" + this.getName() + "]:
", e);
+ }
}
private void closeOne(AbstractTask task) {
@@ -644,7 +646,6 @@ public class StreamThread extends Thread {
task.close();
} catch (StreamsException e) {
log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id()
+ " in thread [" + this.getName() + "]: ", e);
- throw e;
}
sensors.taskDestructionSensor.record();
}
@@ -701,15 +702,20 @@ public class StreamThread extends Thread {
private void removeStandbyTasks() {
- for (StandbyTask task : standbyTasks.values()) {
- closeOne(task);
- }
- // un-assign the change log partitions
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ try {
+ for (StandbyTask task : standbyTasks.values()) {
+ closeOne(task);
+ }
+ standbyTasks.clear();
+ standbyTasksByPartition.clear();
+ standbyRecords.clear();
- standbyTasks.clear();
- standbyTasksByPartition.clear();
- standbyRecords.clear();
+ // un-assign the change log partitions
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+
+ } catch (Exception e) {
+ log.error("Failed to remove standby tasks in thread [" + this.getName() + "]:
", e);
+ }
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups)
{
|