kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
Date Tue, 13 Mar 2018 15:44:06 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95ad035  KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
95ad035 is described below

commit 95ad03540f0d15ae47fd73bae935ab1cb3e8f4b9
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700

    KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
    
    As titled, not starting new transaction since during restoration producer would have not
activity and hence may cause txn expiration. Also delay starting new txn in resuming until
initializing topology.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bill@confluent.io>
---
 .../streams/processor/internals/AssignedTasks.java | 11 ++++--
 .../streams/processor/internals/StreamTask.java    | 41 ++++++++++++----------
 .../streams/processor/internals/StreamThread.java  |  6 ----
 .../streams/processor/internals/TaskManager.java   | 12 +------
 .../internals/AssignedStreamsTasksTest.java        |  3 +-
 .../processor/internals/StreamTaskTest.java        |  7 ++++
 6 files changed, 41 insertions(+), 39 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9e..c806bfd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ abstract class AssignedTasks<T extends Task> {
      * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     Set<TopicPartition> initializeNewTasks() {
         final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -240,18 +241,21 @@ abstract class AssignedTasks<T extends Task> {
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
+                task.resume();
                 try {
-                    task.resume();
+                    transitionToRunning(task, new HashSet<TopicPartition>());
                 } catch (final TaskMigratedException e) {
+                    // we need to catch migration exception internally since this function
+                    // is triggered in the rebalance callback
                     log.info("Failed to resume {} {} since it got migrated to another thread
already. " +
                             "Closing it as zombie before triggering a new rebalance.", taskTypeName,
task.id());
                     final RuntimeException fatalException = closeZombieTask(task);
+                    running.remove(task.id());
                     if (fatalException != null) {
                         throw fatalException;
                     }
                     throw e;
                 }
-                transitionToRunning(task, new HashSet<TopicPartition>());
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
@@ -271,6 +275,9 @@ abstract class AssignedTasks<T extends Task> {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions)
{
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b8777ad..8d6e56a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @param cache                 the {@link ThreadCache} created by the thread
      * @param time                  the system {@link Time} of the thread
      * @param producer              the instance of {@link Producer} used to produce records
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
                       final Collection<TopicPartition> partitions,
@@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         partitionGroup = new PartitionGroup(partitionQueues);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // initialize transactions if eos is turned on, which will block if the previous
transaction has not
+        // completed yet; do not start the first transaction until the topology has been
initialized later
         if (eosEnabled) {
-            try {
-                this.producer.initTransactions();
-                this.producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
+            this.producer.initTransactions();
         }
     }
 
@@ -167,31 +163,38 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return changelogPartitions().isEmpty();
     }
 
+    /**
+     * <pre>
+     * - (re-)initialize the topology of the task
+     * </pre>
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     @Override
     public void initializeTopology() {
         initTopology();
+
+        if (eosEnabled) {
+            try {
+                this.producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
+            transactionInFlight = true;
+        }
+
         processorContext.initialized();
         taskInitialized = true;
     }
 
     /**
      * <pre>
-     * - re-initialize the task
-     * - if (eos) begin new transaction
+     * - resume the task
      * </pre>
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void resume() {
+        // nothing to do; new transaction will be started only after topology is initialized
         log.debug("Resuming");
-        if (eosEnabled) {
-            try {
-                producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
-        }
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e1c45b4..9bbc0da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,9 +346,6 @@ public class StreamThread extends Thread {
             return stateDirectory;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS only)
-         */
         Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final
Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
             final List<T> createdTasks = new ArrayList<>();
             for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
: tasksToBeCreated.entrySet()) {
@@ -401,9 +398,6 @@ public class StreamThread extends Thread {
             this.threadClientId = threadClientId;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS only)
-         */
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId
taskId, final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9f02834..80df517 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -94,9 +94,6 @@ class TaskManager {
         this.adminClient = adminClient;
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     void createTasks(final Collection<TopicPartition> assignment) {
         if (consumer == null) {
             throw new IllegalStateException(logPrefix + "consumer has not been initialized
while adding stream tasks. This should not happen.");
@@ -114,9 +111,6 @@ class TaskManager {
         consumer.pause(partitions);
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
         if (assignedActiveTasks.isEmpty()) {
             return;
@@ -156,9 +150,6 @@ class TaskManager {
         }
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStandbyTasks() {
         final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks;
         if (assignedStandbyTasks.isEmpty()) {
@@ -173,7 +164,6 @@ class TaskManager {
             if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
                 newStandbyTasks.put(taskId, partitions);
             }
-
         }
 
         if (newStandbyTasks.isEmpty()) {
@@ -319,7 +309,7 @@ class TaskManager {
     /**
      * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
-     * @throws TaskMigratedException if another thread wrote to the changelog topic that
is currently restored
+     * @throws TaskMigratedException if the task producer got fenced or consumer discovered
changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 4bb7828..246d047 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -256,9 +256,10 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
+        t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1165d76..a305829 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -776,6 +776,7 @@ public class StreamTaskTest {
     @Test
     public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -792,6 +793,7 @@ public class StreamTaskTest {
     @Test
     public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled()
{
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L,
TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -806,6 +808,7 @@ public class StreamTaskTest {
     @Test
     public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.suspend();
 
         assertTrue(producer.transactionCommitted());
@@ -828,6 +831,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnResumeIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L,
TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -835,6 +839,7 @@ public class StreamTaskTest {
         task.suspend();
 
         task.resume();
+        task.initializeTopology();
         assertTrue(producer.transactionInFlight());
     }
 
@@ -854,6 +859,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnCommitIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L,
TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -878,6 +884,7 @@ public class StreamTaskTest {
     @Test
     public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.close(false, false);
         task = null;
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message