kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9582: Do not abort transaction in unclean close (#8143)
Date Fri, 21 Feb 2020 18:28:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 121df46  KAFKA-9582: Do not abort transaction in unclean close (#8143)
121df46 is described below

commit 121df465fad0bc062537c027bf9ed4755112d10c
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Feb 21 10:27:57 2020 -0800

    KAFKA-9582: Do not abort transaction in unclean close (#8143)
    
    In order to avoid hitting the fatal exception during unclean close, we should avoid calling
the abortTransaction() call.
    
    Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/processor/internals/StreamTask.java    | 49 ++++++----------------
 .../processor/internals/StreamTaskTest.java        | 12 +++---
 2 files changed, 18 insertions(+), 43 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 54da00d..9aa8e79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private long idleStartTime;
     private Producer<byte[], byte[]> producer;
     private boolean commitRequested = false;
-    private boolean transactionInFlight = false;
 
     private final String threadId;
 
@@ -294,7 +293,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             } catch (final ProducerFencedException | UnknownProducerIdException e) {
                 throw new TaskMigratedException(this, e);
             }
-            transactionInFlight = true;
         }
 
         processorContext.initialize();
@@ -522,10 +520,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (eosEnabled) {
                 producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
                 producer.commitTransaction();
-                transactionInFlight = false;
                 if (startNewTransaction) {
                     producer.beginTransaction();
-                    transactionInFlight = true;
                 }
             } else {
                 consumer.commitSync(consumedOffsetsAndMetadata);
@@ -602,7 +598,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     public void suspend() {
         log.debug("Suspending");
-        suspend(true, false);
+        suspend(true);
     }
 
     /**
@@ -618,8 +614,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      *                               or if the task producer got fenced (EOS)
      */
     // visible for testing
-    void suspend(final boolean clean,
-                 final boolean isZombie) {
+    void suspend(final boolean clean) {
         // this is necessary because all partition times are reset to -1 during close
         // we need to preserve the original partitions times before calling commit
         final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();
@@ -640,14 +635,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
                 if (eosEnabled) {
                     stateMgr.checkpoint(activeTaskCheckpointableOffsets());
-
-                    try {
-                        recordCollector.close();
-                    } catch (final RecoverableClientException e) {
-                        taskMigratedException = new TaskMigratedException(this, e);
-                    } finally {
-                        producer = null;
-                    }
+                    taskMigratedException = closeRecordCollector();
                 }
             }
             if (taskMigratedException != null) {
@@ -662,37 +650,26 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             }
 
             if (eosEnabled) {
-                maybeAbortTransactionAndCloseRecordCollector(isZombie);
+                // Ignore any exceptions whilee closing the record collector, i.e task producer.
+                closeRecordCollector();
             }
         }
     }
 
-    private void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie) {
-        if (!isZombie) {
-            try {
-                if (transactionInFlight) {
-                    producer.abortTransaction();
-                }
-                transactionInFlight = false;
-            } catch (final ProducerFencedException ignore) {
-                /* TODO
-                 * this should actually never happen atm as we guard the call to #abortTransaction
-                 * -> the reason for the guard is a "bug" in the Producer -- it throws
IllegalStateException
-                 * instead of ProducerFencedException atm. We can remove the isZombie flag
after KAFKA-5604 got
-                 * fixed and fall-back to this catch-and-swallow code
-                 */
-
-                // can be ignored: transaction got already aborted by brokers/transactional-coordinator
if this happens
-            }
-        }
+    private TaskMigratedException closeRecordCollector() {
+        TaskMigratedException taskMigratedException = null;
 
         try {
             recordCollector.close();
+        } catch (final RecoverableClientException e) {
+            taskMigratedException = new TaskMigratedException(this, e);
         } catch (final Throwable e) {
             log.error("Failed to close producer due to the following error:", e);
         } finally {
             producer = null;
         }
+
+        return taskMigratedException;
     }
 
     private void closeTopology() {
@@ -742,7 +719,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     /**
      * <pre>
-     * - {@link #suspend(boolean, boolean) suspend(clean)}
+     * - {@link #suspend(boolean) suspend(clean)}
      *   - close topology
      *   - if (clean) {@link #commit()}
      *     - flush state and producer
@@ -765,7 +742,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         RuntimeException firstException = null;
         try {
-            suspend(clean, isZombie);
+            suspend(clean);
         } catch (final RuntimeException e) {
             clean = false;
             firstException = e;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1d0ca4f..2832291 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1317,26 +1317,25 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
+    public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
 
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
-        assertFalse(producer.transactionInFlight());
+        // Make sure no method call on the producer during an unclean close (such as abort).
+        assertTrue(producer.transactionInFlight());
         assertTrue(producer.closed());
     }
 
     @Test
-    public void shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled()
{
+    public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
         task = createTaskThatThrowsException(true);
         task.initializeTopology();
 
         task.close(false, false);
 
-        assertTrue(producer.transactionAborted());
         assertTrue(producer.closed());
     }
 
@@ -1553,7 +1552,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled()
{
+    public void shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
         producer.fenceProducerOnClose();
@@ -1561,7 +1560,6 @@ public class StreamTaskTest {
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
         assertFalse(producer.closed());
     }
 


Mime
View raw message