kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
Date Tue, 02 Jun 2020 16:53:02 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 8ecfe6d  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
8ecfe6d is described below

commit 8ecfe6dffa67e027b1fd3bdac5c7a3f80c12cff6
Author: showuon <43372967+showuon@users.noreply.github.com>
AuthorDate: Wed Jun 3 00:50:20 2020 +0800

    KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
    
    Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual
method.
    
    We will create the consumerToOwnedPartitions to keep the set of previously owned partitions
encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual,
we'll get the member generation of the subscription, and remove all previously owned partitions
as invalid if the current generation is higher. However, the logic before my fix, will remove
the current highest member out of the consumerToOwnedPartitions, which should be kept because
it's the curren [...]
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../clients/consumer/internals/AbstractStickyAssignor.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 353a225..9743688 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -121,6 +121,13 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
             if (memberData.generation.isPresent() && memberData.generation.get()
>= maxGeneration
                 || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION)
{
 
+                // If the current member's generation is higher, all the previously owned
partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get()
> maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+
                 membersOfCurrentHighestGeneration.add(consumer);
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current
subscription
@@ -128,13 +135,6 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
                         ownedPartitions.add(tp);
                     }
                 }
-
-                // If the current member's generation is higher, all the previous owned partitions
are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get()
> maxGeneration) {
-                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
             }
         }
 


Mime
View raw message