kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve log4j messaging (#4530)
Date Tue, 06 Feb 2018 00:35:15 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 332e698  MINOR: improve log4j messaging (#4530)
332e698 is described below

commit 332e698ac9c74ce29317021b03a54512c92ac8b3
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Feb 5 16:35:12 2018 -0800

    MINOR: improve log4j messaging (#4530)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, James Cheng <jylcheng@yahoo.com>
---
 .../org/apache/kafka/streams/errors/TaskMigratedException.java | 10 ++++++++++
 .../kafka/streams/processor/internals/PartitionGroup.java      |  2 +-
 .../kafka/streams/processor/internals/RecordCollectorImpl.java |  2 +-
 .../apache/kafka/streams/processor/internals/StreamThread.java |  4 ++--
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index f2fa594..5a284e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException {
 
     private final static long serialVersionUID = 1L;
 
+    private final Task task;
+
     public TaskMigratedException(final Task task) {
         this(task, null);
     }
@@ -42,11 +44,19 @@ public class TaskMigratedException extends StreamsException {
                             pos,
                             task.toString("> ")),
             null);
+
+        this.task = task;
     }
 
     public TaskMigratedException(final Task task,
                                  final Throwable throwable) {
         super(task.toString(), throwable);
+
+        this.task = task;
+    }
+
+    public Task migratedTask() {
+        return task;
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 8ce7dc9..dcaa755 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -153,7 +153,7 @@ public class PartitionGroup {
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         if (recordQueue == null) {
-            throw new IllegalStateException("Record's partition does not belong to this partition-group.");
+            throw new IllegalStateException(String.format("Record's partition %s does not
belong to this partition-group.", partition));
         }
 
         return recordQueue.size();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index afdadf2..286cd81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -121,7 +121,7 @@ public class RecordCollectorImpl implements RecordCollector {
             errorLogMessage += PARAMETER_HINT;
             errorMessage += PARAMETER_HINT;
         }
-        log.error(errorLogMessage, key, value, timestamp, topic, exception);
+        log.error(errorLogMessage, key, value, timestamp, topic, exception.toString());
         sendException = new StreamsException(
             String.format(errorMessage,
                           logPrefix,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb133c6..064a293 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -755,9 +755,9 @@ public class StreamThread extends Thread {
             try {
                 recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
-                log.warn("Detected a task that got migrated to another thread. " +
+                log.warn("Detected task {} that got migrated to another thread. " +
                     "This implies that this thread missed a rebalance and dropped out of
the consumer group. " +
-                    "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup);
+                    "Trying to rejoin the consumer group now. Below is the detailed description
of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message