kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: move restoreConsumer.assign() to shutdownTasksAndState
Date Fri, 07 Oct 2016 18:30:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 e409d8afd -> d6b54da3e


HOTFIX: move restoreConsumer.assign() to shutdownTasksAndState

restoreConsumer.assign(..) in removeStandbyTasks was logging an (ignorable) exception due
to the restoreConsumer being closed. Moved the restoreConsumer.assign(..) to shutdownTasksAndState
as this is done prior to the closing of consumers.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1986 from dguy/hotfix-assign

(cherry picked from commit 454f6845b34d0e3faabd9427f3cd8bbe6209c30d)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6b54da3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6b54da3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6b54da3

Branch: refs/heads/0.10.1
Commit: d6b54da3e433da5a650e397d24db59c7bc05d6e4
Parents: e409d8a
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Oct 7 11:30:00 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Oct 7 11:30:10 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b54da3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0667865..c3c6cc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -297,6 +297,15 @@ public class StreamThread extends Thread {
         producer.flush();
         // Close all task state managers
         closeAllStateManagers(rethrowExceptions);
+        try {
+            // un-assign the change log partitions
+            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        } catch (Exception e) {
+            log.error(String.format("stream-thread [%s] Failed to un-assign change log partitions:
", this.getName()), e);
+            if (rethrowExceptions) {
+                throw e;
+            }
+        }
     }
 
     interface AbstractTaskAction {
@@ -758,17 +767,9 @@ public class StreamThread extends Thread {
     }
 
     private void removeStandbyTasks() {
-        try {
-            standbyTasks.clear();
-            standbyTasksByPartition.clear();
-            standbyRecords.clear();
-
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-
-        } catch (Exception e) {
-            log.error(String.format("stream-thread [%s] Failed to remove standby tasks: ",
this.getName()), e);
-        }
+        standbyTasks.clear();
+        standbyTasksByPartition.clear();
+        standbyRecords.clear();
     }
 
     private class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics {


Mime
View raw message