kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 0.11.0 updated: KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
Date Tue, 12 Jun 2018 17:45:26 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 9c9657b  KAFKA-6747 Check whether there is in-flight transaction before aborting
transaction (#4826)
9c9657b is described below

commit 9c9657b1026014b595999c3c6dd2b442414c2d8e
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Thu Apr 5 15:29:04 2018 -0700

    KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
    
    As Frederic reported on mailing list under the subject "kafka-streams Invalid transition
attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should
only be called when transactionInFlight is true.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
---
 .../java/org/apache/kafka/streams/processor/internals/StreamTask.java | 4 ++--
 .../org/apache/kafka/streams/processor/internals/StreamTaskTest.java  | 1 -
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index de45800..7107fc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -312,8 +312,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 producer.commitTransaction();
                 transactionInFlight = false;
                 if (startNewTransaction) {
-                    transactionInFlight = true;
                     producer.beginTransaction();
+                    transactionInFlight = true;
                 }
             } else {
                 try {
@@ -423,7 +423,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        if (!isZombie) {
+                        if (!isZombie && transactionInFlight) {
                             producer.abortTransaction();
                         }
                     } catch (final ProducerFencedException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 781979e..0ba3f1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -640,7 +640,6 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
-
         assertFalse(producer.transactionInitialized());
         assertFalse(producer.transactionInFlight());
     }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message