kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: refactor error message of task migration (#4803)
Date Mon, 02 Apr 2018 00:37:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e5d4af  MINOR: refactor error message of task migration (#4803)
2e5d4af is described below

commit 2e5d4af83f6b9f87fc84c0ff10462a5414ff8cb4
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sun Apr 1 17:37:00 2018 -0700

    MINOR: refactor error message of task migration (#4803)
    
    In the stream thread capture of the TaskMigration exception, print the task full information
in WARN. In other places only log as INFO, plus additional context information.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/streams/errors/TaskMigratedException.java  | 20 ++++++++++++++------
 .../streams/processor/internals/StreamThread.java    |  8 ++++----
 .../internals/AssignedStreamsTasksTest.java          | 14 +++++++-------
 3 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index 5a284e4..4195433 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -30,27 +30,35 @@ public class TaskMigratedException extends StreamsException {
 
     private final Task task;
 
-    public TaskMigratedException(final Task task) {
-        this(task, null);
+    // this is for unit test only
+    public TaskMigratedException() {
+        super("A task has been migrated unexpectedly", null);
+
+        this.task = null;
     }
 
     public TaskMigratedException(final Task task,
                                  final TopicPartition topicPartition,
                                  final long endOffset,
                                  final long pos) {
-        super(String.format("Log end offset of %s should not change while restoring: old
end offset %d, current offset %d%n%s",
+        super(String.format("Log end offset of %s should not change while restoring: old
end offset %d, current offset %d",
                             topicPartition,
                             endOffset,
-                            pos,
-                            task.toString("> ")),
+                            pos),
             null);
 
         this.task = task;
     }
 
+    public TaskMigratedException(final Task task) {
+        super(String.format("Task %s is unexpectedly closed during processing", task.id()),
null);
+
+        this.task = task;
+    }
+
     public TaskMigratedException(final Task task,
                                  final Throwable throwable) {
-        super(task.toString(), throwable);
+        super(String.format("Client request for task %s has been fenced due to a rebalance",
task.id()), throwable);
 
         this.task = task;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab96cce..a7e3bcd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -899,7 +899,7 @@ public class StreamThread extends Thread {
             final StreamTask task = taskManager.activeTask(partition);
 
             if (task.isClosed()) {
-                log.warn("Stream task {} is already closed, probably because it got unexpectedly
migrated to another thread already. " +
+                log.info("Stream task {} is already closed, probably because it got unexpectedly
migrated to another thread already. " +
                         "Notifying the thread to trigger a new rebalance immediately.", task.id());
                 throw new TaskMigratedException(task);
             }
@@ -1032,7 +1032,7 @@ public class StreamThread extends Thread {
                             final StandbyTask task = taskManager.standbyTask(partition);
 
                             if (task.isClosed()) {
-                                log.warn("Standby task {} is already closed, probably because
it got unexpectly migrated to another thread already. " +
+                                log.info("Standby task {} is already closed, probably because
it got unexpectedly migrated to another thread already. " +
                                         "Notifying the thread to trigger a new rebalance
immediately.", task.id());
                                 throw new TaskMigratedException(task);
                             }
@@ -1065,7 +1065,7 @@ public class StreamThread extends Thread {
                         }
 
                         if (task.isClosed()) {
-                            log.warn("Standby task {} is already closed, probably because
it got unexpectedly migrated to another thread already. " +
+                            log.info("Standby task {} is already closed, probably because
it got unexpectedly migrated to another thread already. " +
                                     "Notifying the thread to trigger a new rebalance immediately.",
task.id());
                             throw new TaskMigratedException(task);
                         }
@@ -1084,7 +1084,7 @@ public class StreamThread extends Thread {
                     final StandbyTask task = taskManager.standbyTask(partition);
 
                     if (task.isClosed()) {
-                        log.warn("Standby task {} is already closed, probably because it
got unexpectly migrated to another thread already. " +
+                        log.info("Standby task {} is already closed, probably because it
got unexpectedly migrated to another thread already. " +
                                 "Notifying the thread to trigger a new rebalance immediately.",
task.id());
                         throw new TaskMigratedException(task);
                     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index fcd2322..8a8d625 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -195,7 +195,7 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
         mockTaskInitialization();
         t1.suspend();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -226,7 +226,7 @@ public class AssignedStreamsTasksTest {
         mockRunningTaskSuspension();
         t1.resume();
         t1.initializeTopology();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -267,7 +267,7 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseTaskOnCommitIfTaskMigratedException() {
         mockTaskInitialization();
         t1.commit();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -319,7 +319,7 @@ public class AssignedStreamsTasksTest {
         mockTaskInitialization();
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -338,7 +338,7 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
         t1.process();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -370,7 +370,7 @@ public class AssignedStreamsTasksTest {
     public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
         mockTaskInitialization();
         t1.maybePunctuateStreamTime();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
@@ -390,7 +390,7 @@ public class AssignedStreamsTasksTest {
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
         t1.maybePunctuateSystemTime();
-        EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
+        EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message