kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: HOTFIX: Enforce a rebalance upon task migration (#4802)
Date Sat, 31 Mar 2018 05:24:18 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ca383b2  HOTFIX: Enforce a rebalance upon task migration (#4802)
ca383b2 is described below

commit ca383b2e16bbf272f823f8a2f0d1476f8400cc31
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Mar 30 22:24:13 2018 -0700

    HOTFIX: Enforce a rebalance upon task migration (#4802)
    
    Unsubscribe / resubscribe a rebalance upon task migration (either false positive or not)
to enforce a rebalance, also to refresh on log end offset
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/errors/TaskMigratedException.java | 21 +++++++++++++++------
 .../streams/processor/internals/StreamThread.java   | 11 ++++++++---
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index f2fa594..4254983 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException {
 
     private final static long serialVersionUID = 1L;
 
+    private final Task task;
+
     public TaskMigratedException(final Task task) {
         this(task, null);
     }
@@ -36,17 +38,24 @@ public class TaskMigratedException extends StreamsException {
                                  final TopicPartition topicPartition,
                                  final long endOffset,
                                  final long pos) {
-        super(String.format("Log end offset of %s should not change while restoring: old
end offset %d, current offset %d%n%s",
-                            topicPartition,
-                            endOffset,
-                            pos,
-                            task.toString("> ")),
-            null);
+        super(String.format("Log end offset of %s should not change while restoring: old
end offset %d, current offset %d",
+                topicPartition,
+                endOffset,
+                pos),
+                null);
+
+        this.task = task;
     }
 
     public TaskMigratedException(final Task task,
                                  final Throwable throwable) {
         super(task.toString(), throwable);
+
+        this.task = task;
+    }
+
+    public Task migratedTask() {
+        return task;
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index fb9b8e0..2ba66a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -749,9 +749,14 @@ public class StreamThread extends Thread {
             try {
                 recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
-                log.warn("Detected a task that got migrated to another thread. " +
-                    "This implies that this thread missed a rebalance and dropped out of
the consumer group. " +
-                    "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup);
+                log.warn("Detected task {} that got migrated to another thread: {} " +
+                        "This implies that the thread may have missed a rebalance and dropped
out of the consumer group. " +
+                        "Will try to rejoin the consumer group. Below is the detailed description
of the task:\n{}",
+                        ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.getMessage(),
ignoreAndRejoinGroup.migratedTask().toString(">"));
+
+                // re-subscribe to enforce a rebalance in the next poll call
+                consumer.unsubscribe();
+                consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message