kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8290: Close producer for zombie task (#6636)
Date Mon, 20 May 2019 13:34:53 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2fec972  KAFKA-8290: Close producer for zombie task (#6636)
2fec972 is described below

commit 2fec972803c03ce6d1e6208c4d4dcecb58e109ed
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Mon May 20 09:02:25 2019 -0400

    KAFKA-8290: Close producer for zombie task (#6636)
    
    When we close a task and EOS is enabled we should always close the producer regardless
if the task is in a zombie state (the broker fenced the producer) or not.
    
    I've added tests that fail without this change.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Jason Gustafson <jason@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    |  2 ++
 .../processor/internals/StreamTaskTest.java        | 35 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 67d457f..e78cd36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -604,7 +604,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
                 // can be ignored: transaction got already aborted by brokers/transactional-coordinator
if this happens
             }
+        }
 
+        if (eosEnabled) {
             try {
                 recordCollector.close();
             } catch (final Throwable e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ac6f9d4..6aaa1f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1093,6 +1093,19 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldOnlyCloseFencedProducerOnUncleanClosedWithEosEnabled() {
+        task = createStatelessTask(createConfig(true));
+        task.initializeTopology();
+        producer.fenceProducer();
+
+        task.close(false, true);
+        task = null;
+
+        assertFalse(producer.transactionAborted());
+        assertTrue(producer.closed());
+    }
+
+    @Test
     public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled()
{
         task = createStatelessTask(createConfig(true));
         task.initializeTopology();
@@ -1147,7 +1160,7 @@ public class StreamTaskTest {
     public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
         task = createStatelessTask(createConfig(true));
 
-        assertTrue(!producer.transactionInFlight());
+        assertFalse(producer.transactionInFlight());
         task.close(false, false);
     }
 
@@ -1305,6 +1318,26 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldCloseProducerOnUncleanCloseNotZombieWhenEosEnabled() {
+        task = createStatelessTask(createConfig(true));
+        task.initializeTopology();
+        task.close(false, false);
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldCloseProducerOnUncleanCloseIsZombieWhenEosEnabled() {
+        task = createStatelessTask(createConfig(true));
+        task.initializeTopology();
+        task.close(false, true);
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
         task = createTaskThatThrowsException(false);
         task.initializeStateStores();


Mime
View raw message