kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: catch an exception in rebalance and stop the stream thread
Date Fri, 12 Feb 2016 09:11:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 330274ed1 -> f141e647a


MINOR: catch an exception in rebalance and stop the stream thread

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #901 from ymatsuda/minor3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f141e647
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f141e647
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f141e647

Branch: refs/heads/trunk
Commit: f141e647a4a3daf618b48073058320ecaa3671d0
Parents: 330274e
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Fri Feb 12 17:11:12 2016 +0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 12 17:11:12 2016 +0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 32 ++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f141e647/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 18dc0ec..3fc9407 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -104,6 +104,7 @@ public class StreamThread extends Thread {
     private long lastClean;
     private long lastCommit;
     private long recordsProcessed;
+    private Throwable rebalanceException = null;
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
@@ -123,19 +124,29 @@ public class StreamThread extends Thread {
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
-            addStreamTasks(assignment);
-            addStandbyTasks();
-            lastClean = time.milliseconds(); // start the cleaning cycle
+            try {
+                addStreamTasks(assignment);
+                addStandbyTasks();
+                lastClean = time.milliseconds(); // start the cleaning cycle
+            } catch (Throwable t) {
+                rebalanceException = t;
+                throw t;
+            }
         }
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
-            commitAll();
-            // TODO: right now upon partition revocation, we always remove all the tasks;
-            // this behavior can be optimized to only remove affected tasks in the future
-            removeStreamTasks();
-            removeStandbyTasks();
-            lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
+            try {
+                commitAll();
+                // TODO: right now upon partition revocation, we always remove all the tasks;
+                // this behavior can be optimized to only remove affected tasks in the future
+                removeStreamTasks();
+                removeStandbyTasks();
+                lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are
assigned
+            } catch (Throwable t) {
+                rebalanceException = t;
+                throw t;
+            }
         }
     };
 
@@ -318,6 +329,9 @@ public class StreamThread extends Thread {
                 ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered
== 0 ? this.pollTimeMs : 0);
                 lastPoll = time.milliseconds();
 
+                if (rebalanceException != null)
+                    throw new StreamsException("Failed to rebalance", rebalanceException);
+
                 if (!records.isEmpty()) {
                     for (TopicPartition partition : records.partitions()) {
                         StreamTask task = activeTasksByPartition.get(partition);


Mime
View raw message