kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception
Date Tue, 10 Oct 2017 22:16:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a5a9a901e -> c22c1775a


KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the
exception

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4046 from mjsax/kafka-5541-minor-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c22c1775
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c22c1775
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c22c1775

Branch: refs/heads/trunk
Commit: c22c1775a550dbefe6bd4cdcf8820404351257a8
Parents: a5a9a90
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Oct 10 15:16:53 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 10 15:16:53 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c22c1775/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 680bbd3..7426d6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks {
             } catch (final TaskMigratedException e) {
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
-                firstException.compareAndSet(null, t);
                 log.error("Failed while closing {} {} due to the following error:",
                           task.getClass().getSimpleName(),
                           task.id(),
                           t);
-                firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
+                if (clean) {
+                    if (!closeUnclean(task)) {
+                        firstException.compareAndSet(null, t);
+                    }
+                } else {
+                    firstException.compareAndSet(null, t);
+                }
             }
         }
 
@@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    private RuntimeException closeUncleanIfRequired(final Task task,
-                                                    final boolean triedToCloseCleanlyPreviously)
{
-        if (triedToCloseCleanlyPreviously) {
-            log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
-            try {
-                task.close(false, false);
-            } catch (final RuntimeException fatalException) {
-                log.error("Failed while closing {} {} due to the following error:",
-                    task.getClass().getSimpleName(),
-                    task.id(),
-                    fatalException);
-                return fatalException;
-            }
+    private boolean closeUnclean(final Task task) {
+        log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
+        try {
+            task.close(false, false);
+        } catch (final RuntimeException fatalException) {
+            log.error("Failed while closing {} {} due to the following error:",
+                task.getClass().getSimpleName(),
+                task.id(),
+                fatalException);
+            return false;
         }
 
-        return null;
+        return true;
     }
 }


Mime
View raw message