kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (#7620)
Date Fri, 01 Nov 2019 23:15:01 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c46dded  KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after
rebalance (#7620)
c46dded is described below

commit c46dded7dad23af3d52a86ed37e3f1f27ce070f3
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri Nov 1 16:10:43 2019 -0700

    KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (#7620)
    
    Currently when we identify version probing we return early from onAssignment and never
get to updating the TaskManager and general state with the new assignment. Since we do actually
give out "real" assignments even during version probing, a StreamThread should take real ownership
of its tasks/partitions including cleaning them up in onPartitionsRevoked which gets invoked
when we call onLeavePrepare as part of triggering the follow-up rebalance.
    
    Every member will always get an assignment encoded with the lowest common version, so
there should be no problem decoding a VP assignment. We should just allow onAssignment to
proceed as usual so that the TaskManager is in a consistent state, and knows what all its
tasks/partitions are when the first rebalance completes and the next one is triggered.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <mjsax@apache.org>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../clients/consumer/internals/ConsumerCoordinator.java     |  6 ++++++
 .../streams/processor/internals/AssignedStandbyTasks.java   |  4 ++--
 .../streams/processor/internals/AssignedStreamsTasks.java   |  4 ++--
 .../processor/internals/StreamsPartitionAssignor.java       |  1 -
 .../processor/internals/StreamsRebalanceListener.java       | 10 +---------
 .../kafka/streams/processor/internals/TaskManager.java      | 13 +++++++------
 .../org/apache/kafka/streams/tests/StreamsUpgradeTest.java  |  1 -
 7 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d5b3061..61bd48a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -692,6 +692,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     @Override
     public void onLeavePrepare() {
+        // Save the current Generation and use that to get the memberId, as the hb thread
can change it at any time
+        final Generation currentGeneration = generation();
+        final String memberId = currentGeneration.memberId;
+
+        log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration,
memberId);
+
         // we should reset assignment and trigger the callback before leaving group
         Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index f217a55..0f8896e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -33,10 +33,10 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
     @Override
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
-        log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
+        log.debug("{} shutdown of all standby tasks" + "\n" +
                       "non-initialized standby tasks to close: {}" + "\n" +
                       "running standby tasks to close: {}",
-            clean, created.keySet(), running.keySet());
+            shutdownType, created.keySet(), running.keySet());
         super.shutdown(clean);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 161714e..1400d5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -494,12 +494,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
     @Override
     public void shutdown(final boolean clean) {
         final String shutdownType = clean ? "Clean" : "Unclean";
-        log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
+        log.debug("{} shutdown of all active tasks" + "\n" +
                       "non-initialized stream tasks to close: {}" + "\n" +
                       "restoring tasks to close: {}" + "\n" +
                       "running stream tasks to close: {}" + "\n" +
                       "suspended stream tasks to close: {}",
-            clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet());
+            shutdownType, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet());
         super.shutdown(clean);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 78ee40c..38a150b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1109,7 +1109,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         // Check if this was a version probing rebalance and check the error code to trigger
another rebalance if so
         if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion))
{
             setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
-            return;
         }
 
         // version 1 field
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index f2c75b2..a4f1f6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -68,15 +68,7 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener
{
             if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                 log.debug(
                     "Skipping task creation in rebalance because we are already in {} state.",
-                    streamThread.state()
-                );
-            } else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code())
{
-                log.debug(
-                    "Encountered assignment error during partition assignment: {}. Skipping
task initialization and "
-                        + "pausing any partitions we may have been assigned.",
-                    streamThread.getAssignmentErrorCode()
-                );
-                taskManager.pausePartitions();
+                    streamThread.state());
             } else {
                 // Close non-reassigned tasks before initializing new ones as we may have
suspended active
                 // tasks that become standbys or vice versa
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 72cff77..4d6dd4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -467,21 +467,22 @@ public class TaskManager {
         }
 
         log.debug("Assigning metadata with: " +
-                      "\tactiveTasks: {},\n" +
-                      "\tstandbyTasks: {}\n" +
-                      "The updated active task states are: \n" +
+                      "\tpreviousAssignedActiveTasks: {},\n" +
+                      "\tpreviousAssignedStandbyTasks: {}\n" +
+                      "The updated task states are: \n" +
                       "\tassignedActiveTasks {},\n" +
                       "\tassignedStandbyTasks {},\n" +
                       "\taddedActiveTasks {},\n" +
                       "\taddedStandbyTasks {},\n" +
                       "\trevokedActiveTasks {},\n" +
                       "\trevokedStandbyTasks {}",
-                  activeTasks, standbyTasks,
                   assignedActiveTasks, assignedStandbyTasks,
+                  activeTasks, standbyTasks,
                   addedActiveTasks, addedStandbyTasks,
                   revokedActiveTasks, revokedStandbyTasks);
-        this.assignedActiveTasks = activeTasks;
-        this.assignedStandbyTasks = standbyTasks;
+
+        assignedActiveTasks = activeTasks;
+        assignedStandbyTasks = standbyTasks;
     }
 
     public void updateSubscriptionsFromAssignment(final List<TopicPartition> partitions)
{
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 185fa7c..bcb6d82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -173,7 +173,6 @@ public class StreamsUpgradeTest {
 
             if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion()))
{
                 setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
-                return;
             }
 
             final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());


Mime
View raw message