kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8972: Need to flush state even on unclean close (#7589)
Date Fri, 25 Oct 2019 04:29:18 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 1c3d08f  KAFKA-8972: Need to flush state even on unclean close (#7589)
1c3d08f is described below

commit 1c3d08f4fca3c786fcaca1dee9ffd6e2dcd74713
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu Oct 24 21:24:55 2019 -0700

    KAFKA-8972: Need to flush state even on unclean close (#7589)
    
    In the case of unclean close we still need to make sure all the stores are flushed before
closing any.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Boyang Chen <boyang@confluent.io>,
Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e859df8..f167e1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -673,12 +673,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 throw taskMigratedException;
             }
         } else {
-            maybeAbortTransactionAndCloseRecordCollector(isZombie);
+            // In the case of unclean close we still need to make sure all the stores are
flushed before closing any
+            super.flushState();
+
+            if (eosEnabled) {
+                maybeAbortTransactionAndCloseRecordCollector(isZombie);
+            }
         }
     }
 
     private void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie) {
-        if (eosEnabled && !isZombie) {
+        if (!isZombie) {
             try {
                 if (transactionInFlight) {
                     producer.abortTransaction();
@@ -696,14 +701,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             }
         }
 
-        if (eosEnabled) {
-            try {
-                recordCollector.close();
-            } catch (final Throwable e) {
-                log.error("Failed to close producer due to the following error:", e);
-            } finally {
-                producer = null;
-            }
+        try {
+            recordCollector.close();
+        } catch (final Throwable e) {
+            log.error("Failed to close producer due to the following error:", e);
+        } finally {
+            producer = null;
         }
     }
 


Mime
View raw message