kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5603; Don't abort TX for zombie tasks
Date Tue, 05 Sep 2017 22:47:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk facd2c5a8 -> 752e53174


KAFKA-5603; Don't abort TX for zombie tasks

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3719 from mjsax/kafka-5603-dont-abort-tx-for-zombie-tasks-2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/752e5317
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/752e5317
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/752e5317

Branch: refs/heads/trunk
Commit: 752e5317410f05d08f69f302e4ee0cadae183b59
Parents: facd2c5
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Sep 5 15:47:31 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 5 15:47:31 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    |  2 --
 .../clients/producer/MockProducerTest.java      | 11 --------
 .../processor/internals/AssignedTasks.java      | 10 +++----
 .../processor/internals/StandbyTask.java        | 10 +++++--
 .../streams/processor/internals/StreamTask.java | 19 +++++++++----
 .../kafka/streams/processor/internals/Task.java |  4 +--
 .../processor/internals/AbstractTaskTest.java   |  6 ++--
 .../processor/internals/AssignedTasksTest.java  | 12 ++++----
 .../processor/internals/StandbyTaskTest.java    |  2 +-
 .../processor/internals/StreamTaskTest.java     | 29 ++++++++++++++------
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 11 files changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 210c2bb..d2a84c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -306,8 +306,6 @@ public class MockProducer<K, V> implements Producer<K, V>
{
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
         }
-        if (transactionInFlight)
-            abortTransaction();
         this.closed = true;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index d343194..27fac28 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -242,17 +242,6 @@ public class MockProducerTest {
         assertFalse(producer.transactionCommitted());
     }
 
-    @Test
-    public void shouldAbortInFlightTransactionOnClose() {
-        buildMockProducer(true);
-        producer.initTransactions();
-        producer.beginTransaction();
-        producer.close();
-        assertFalse(producer.transactionInFlight());
-        assertTrue(producer.transactionAborted());
-        assertFalse(producer.transactionCommitted());
-    }
-
     @Test(expected = IllegalStateException.class)
     public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
         buildMockProducer(true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index f09c48e..a1966b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -192,7 +192,7 @@ class AssignedTasks {
         RuntimeException exception = null;
         for (final Task task : tasks) {
             try {
-                task.close(false);
+                task.close(false, false);
             } catch (final RuntimeException e) {
                 log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id(),
e);
                 if (exception == null) {
@@ -220,7 +220,7 @@ class AssignedTasks {
             } catch (final RuntimeException e) {
                 log.error("{} Suspending {} {} failed due to the following error:", logPrefix,
taskTypeName, task.id(), e);
                 try {
-                    task.close(false);
+                    task.close(false, false);
                 } catch (final Exception f) {
                     log.error("{} After suspending failed, closing the same {} {} failed
again due to the following error:", logPrefix, taskTypeName, task.id(), f);
                 }
@@ -235,7 +235,7 @@ class AssignedTasks {
     private void closeZombieTask(final Task task) {
         log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id());
         try {
-            task.close(false);
+            task.close(false, true);
         } catch (final Exception e) {
             log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName,
logPrefix, e);
         }
@@ -420,7 +420,7 @@ class AssignedTasks {
             if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id())))
{
                 log.debug("{} Closing suspended and not re-assigned {} {}", logPrefix, taskTypeName,
suspendedTask.id());
                 try {
-                    suspendedTask.closeSuspended(true, null);
+                    suspendedTask.closeSuspended(true, false, null);
                 } catch (final Exception e) {
                     log.error("{} Failed to remove suspended {} {} due to the following error:",
logPrefix, taskTypeName, suspendedTask.id(), e);
                 } finally {
@@ -439,7 +439,7 @@ class AssignedTasks {
     private void close(final Collection<Task> tasks, final boolean clean) {
         for (final Task task : tasks) {
             try {
-                task.close(clean);
+                task.close(clean, false);
             } catch (final Throwable t) {
                 log.error("{} Failed while closing {} {} due to the following error:",
                           logPrefix,

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ae857b4..75151a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -116,9 +116,11 @@ public class StandbyTask extends AbstractTask {
      * <pre>
      * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly
      *              (ie, commit, flush, and write checkpoint file)
+     * @param isZombie ignored by {@code StandbyTask} as it can never be a zombie
      */
     @Override
-    public void close(final boolean clean) {
+    public void close(final boolean clean,
+                      final boolean isZombie) {
         if (!taskInitialized) {
             return;
         }
@@ -133,8 +135,10 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public void closeSuspended(final boolean clean, final RuntimeException e) {
-        close(clean);
+    public void closeSuspended(final boolean clean,
+                               final boolean isZombie,
+                               final RuntimeException e) {
+        close(clean, isZombie);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ca726e2..1fe567e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -400,7 +400,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     // helper to avoid calling suspend() twice if a suspended task is not reassigned and
closed
-    public void closeSuspended(boolean clean, RuntimeException firstException) {
+    @Override
+    public void closeSuspended(boolean clean,
+                               final boolean isZombie,
+                               RuntimeException firstException) {
         try {
             closeStateManager(clean);
         } catch (final RuntimeException e) {
@@ -418,14 +421,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        producer.abortTransaction();
+                        if (!isZombie) {
+                            producer.abortTransaction();
+                        }
                         transactionInFlight = false;
                     } catch (final ProducerFencedException e) {
                         // can be ignored: transaction got already aborted by brokers/transactional-coordinator
if this happens
                     }
                 }
                 try {
-                    recordCollector.close();
+                    if (!isZombie) {
+                        recordCollector.close();
+                    }
                 } catch (final Throwable e) {
                     log.error("{} Failed to close producer due to the following error:",
logPrefix, e);
                 }
@@ -455,9 +462,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * </pre>
      * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
      *              otherwise, just close open resources
+     * @param isZombie {@code true} is this task is a zombie or not
      */
     @Override
-    public void close(boolean clean) {
+    public void close(boolean clean,
+                      final boolean isZombie) {
         log.debug("{} Closing", logPrefix);
 
         RuntimeException firstException = null;
@@ -469,7 +478,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             log.error("{} Could not close task due to the following error:", logPrefix, e);
         }
 
-        closeSuspended(clean, firstException);
+        closeSuspended(clean, isZombie, firstException);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index cf09433..a481c31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -34,7 +34,7 @@ public interface Task {
 
     void suspend();
 
-    void close(boolean clean);
+    void close(boolean clean, boolean isZombie);
 
     TaskId id();
 
@@ -48,7 +48,7 @@ public interface Task {
 
     StateStore getStore(String name);
 
-    void closeSuspended(boolean clean, RuntimeException e);
+    void closeSuspended(boolean clean, boolean isZombie, RuntimeException e);
 
     Map<TopicPartition, Long> checkpointedOffsets();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 43fe24f..54726d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -135,12 +135,10 @@ public class AbstractTaskTest {
             public void suspend() {}
 
             @Override
-            public void close(final boolean clean) {}
+            public void close(final boolean clean, final boolean isZombie) {}
 
             @Override
-            public void closeSuspended(final boolean clean, final RuntimeException e) {
-
-            }
+            public void closeSuspended(final boolean clean, final boolean isZombie, final
RuntimeException e) {}
 
             @Override
             public Map<TopicPartition, Long> checkpointedOffsets() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index de2a489..7f961af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -160,7 +160,7 @@ public class AssignedTasksTest {
     @Test
     public void shouldCloseRestoringTasks() {
         EasyMock.expect(t1.initialize()).andReturn(false);
-        t1.close(false);
+        t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -170,7 +170,7 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldClosedUnInitializedTasksOnSuspend() {
-        t1.close(false);
+        t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -196,7 +196,7 @@ public class AssignedTasksTest {
         mockTaskInitialization();
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
-        t1.close(false);
+        t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -210,7 +210,7 @@ public class AssignedTasksTest {
         mockTaskInitialization();
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new ProducerFencedException("KABOOM!"));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -259,7 +259,7 @@ public class AssignedTasksTest {
         mockTaskInitialization();
         t1.commit();
         EasyMock.expectLastCall().andThrow(new ProducerFencedException(""));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();
@@ -319,7 +319,7 @@ public class AssignedTasksTest {
         EasyMock.expect(t1.commitNeeded()).andReturn(true);
         t1.commit();
         EasyMock.expectLastCall().andThrow(new ProducerFencedException(""));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         addAndInitTask();

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 40a66da..b9455e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -421,7 +421,7 @@ public class StandbyTaskTest {
         };
         task.initialize();
         try {
-            task.close(true);
+            task.close(true, false);
             fail("should have thrown exception");
         } catch (Exception e) {
             // expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4246b17..61d7875 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -171,7 +171,7 @@ public class StreamTaskTest {
     public void cleanup() throws IOException {
         try {
             if (task != null) {
-                task.close(true);
+                task.close(true, false);
             }
         } finally {
             Utils.delete(baseDir);
@@ -453,7 +453,7 @@ public class StreamTaskTest {
                                                                  Collections.<String,
String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        task.close(true);
+        task.close(true, false);
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
config,
             streamsMetrics, stateDirectory, null, time, producer);
@@ -740,11 +740,11 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
throws Exception {
-        task.close(true);
+        task.close(true, false);
         task = createTaskThatThrowsExceptionOnClose();
         task.initialize();
         try {
-            task.close(true);
+            task.close(true, false);
             fail("should have thrown runtime exception");
         } catch (final RuntimeException e) {
             task = null;
@@ -881,18 +881,29 @@ public class StreamTaskTest {
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
-        task.close(false);
+        task.close(false, false);
         task = null;
         assertTrue(producer.transactionAborted());
     }
 
     @Test
+    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(false, true);
+        task = null;
+        assertFalse(producer.transactionAborted());
+    }
+
+    @Test
     public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
-        task.close(false);
+        task.close(false, false);
         assertFalse(producer.transactionAborted());
     }
 
@@ -904,7 +915,7 @@ public class StreamTaskTest {
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
             changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
-        task.close(true);
+        task.close(true, false);
         task = null;
         assertTrue(producer.closed());
     }
@@ -988,7 +999,7 @@ public class StreamTaskTest {
         };
 
         try {
-            streamTask.close(true);
+            streamTask.close(true, false);
             fail("should have thrown an exception");
         } catch (Exception e) {
             // all good
@@ -1000,7 +1011,7 @@ public class StreamTaskTest {
     public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
         final StreamTask task = createTaskThatThrowsExceptionOnClose();
         try {
-            task.close(true);
+            task.close(true, false);
         } catch (Exception e) {
             fail("should have not closed unitialized topology");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/752e5317/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index deba860..b2dbeb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -411,7 +411,7 @@ public class ProcessorTopologyTestDriver {
      */
     public void close() {
         if (task != null) {
-            task.close(true);
+            task.close(true, false);
         }
         if (globalStateTask != null) {
             try {


Mime
View raw message