kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: catch a commit failure due to rebalance in StreamThread
Date Tue, 23 Feb 2016 05:39:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 982ab09a7 -> 68af16ac1


MINOR: catch a commit failure due to rebalance in StreamThread

StreamThread should keep going after a commit was failed due to a group rebalance.
Currently the thread just dies.
guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #933 from ymatsuda/catch_commit_failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68af16ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68af16ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68af16ac

Branch: refs/heads/trunk
Commit: 68af16ac15e5675daebb710ed8f15f780dc43abd
Parents: 982ab09
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Feb 22 21:39:26 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Feb 22 21:39:26 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68af16ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 70e24d0..10e458a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -487,7 +488,11 @@ public class StreamThread extends Thread {
     private void commitOne(AbstractTask task, long now) {
         try {
             task.commit();
+        } catch (CommitFailedException e) {
+            // commit failed. Just log it.
+            log.warn("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id()
+ " in thread [" + this.getName() + "]: ", e);
         } catch (KafkaException e) {
+            // commit failed due to an unexpected exception. Log it and rethrow the exception.
             log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id()
+ " in thread [" + this.getName() + "]: ", e);
             throw e;
         }


Mime
View raw message