kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: log and fail on missing task in Streams (#5655)
Date Mon, 17 Sep 2018 16:22:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 607dea2  MINOR: log and fail on missing task in Streams (#5655)
607dea2 is described below

commit 607dea234f30a0c40d5a48f518a51372e02fa0a0
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Mon Sep 17 11:22:36 2018 -0500

    MINOR: log and fail on missing task in Streams (#5655)
    
    Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/processor/internals/StreamThread.java       | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b43177d..3a19aa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -979,9 +979,16 @@ public class StreamThread extends Thread {
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
 
-            if (task.isClosed()) {
+            if (task == null) {
+                log.error(
+                    "Unable to locate active task for received-record partition {}. Current
tasks: {}",
+                    partition,
+                    taskManager.toString(">")
+                );
+                throw new NullPointerException("Task was unexpectedly missing for partition
" + partition);
+            } else if (task.isClosed()) {
                 log.info("Stream task {} is already closed, probably because it got unexpectedly
migrated to another thread already. " +
-                    "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                             "Notifying the thread to trigger a new rebalance immediately.",
task.id());
                 throw new TaskMigratedException(task);
             }
 


Mime
View raw message