kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: don't throw CommitFailedException during suspendTasksAndState
Date Tue, 14 Feb 2017 18:44:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3b36d5cff -> 4db048d61


MINOR: don't throw CommitFailedException during suspendTasksAndState

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2535 from dguy/minor-commit-failed


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4db048d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4db048d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4db048d6

Branch: refs/heads/trunk
Commit: 4db048d61206bc6efbd143d6293216b7cb4b86c5
Parents: 3b36d5c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Feb 14 10:40:39 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Feb 14 10:43:34 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4db048d6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index fba3db5..12d472b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -477,7 +477,9 @@ public class StreamThread extends Thread {
         firstException.compareAndSet(null, flushAllState());
         // only commit after all state has been flushed and there hasn't been an exception
         if (firstException.get() == null) {
-            firstException.set(commitOffsets());
+            // TODO: currently commit failures will not be thrown to users
+            // while suspending tasks; this need to be re-visit after KIP-98
+            commitOffsets();
         }
         // remove the changelog partitions from restore consumer
         firstException.compareAndSet(null, unAssignChangeLogPartitions());


Mime
View raw message