kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
Date Tue, 02 Jun 2020 16:52:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2c30619  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
2c30619 is described below

commit 2c30619c43154b01a1b182755b7385c2394d04d5
Author: showuon <43372967+showuon@users.noreply.github.com>
AuthorDate: Wed Jun 3 00:50:20 2020 +0800

    KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
    
    Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual
method.
    
    We will create the consumerToOwnedPartitions to keep the set of previously owned partitions
encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual,
we'll get the member generation of the subscription, and remove all previously owned partitions
as invalid if the current generation is higher. However, the logic before my fix, will remove
the current highest member out of the consumerToOwnedPartitions, which should be kept because
it's the curren [...]
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../clients/consumer/internals/AbstractStickyAssignor.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 353a225..9743688 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -121,6 +121,13 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
             if (memberData.generation.isPresent() && memberData.generation.get()
>= maxGeneration
                 || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION)
{
 
+                // If the current member's generation is higher, all the previously owned
partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get()
> maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+
                 membersOfCurrentHighestGeneration.add(consumer);
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current
subscription
@@ -128,13 +135,6 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
                         ownedPartitions.add(tp);
                     }
                 }
-
-                // If the current member's generation is higher, all the previous owned partitions
are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get()
> maxGeneration) {
-                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
             }
         }
 


Mime
View raw message