kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (#8668)
Date Mon, 01 Jun 2020 23:01:59 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 480e354  KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
(#8668)
480e354 is described below

commit 480e354558d542095f33ff599e9e14db88c8adbc
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Mon Jun 1 15:57:15 2020 -0700

    KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (#8668)
    
    Motivation and pseudo code algorithm in the ticket.
    
    Added a scale test with large number of topic partitions and consumers and 30s timeout.
    With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes
within a few seconds.
    
    Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number
of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)
    
    Should be cherry-picked to 2.6, 2.5, and 2.4
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../consumer/CooperativeStickyAssignor.java        |  46 +--
 .../consumer/internals/AbstractStickyAssignor.java | 319 ++++++++++++++-------
 .../kafka/clients/consumer/StickyAssignorTest.java |   5 +-
 .../internals/AbstractStickyAssignorTest.java      |  82 +++---
 5 files changed, 283 insertions(+), 173 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 034fbc8..4c1d74e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -59,13 +59,13 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor).java"/>
 
     <suppress checks="JavaNCSS"
               files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
 
     <suppress checks="NPathComplexity"
-              files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
+              files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor).java"/>
 
     <suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
               files="CoordinatorClient.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index bef32bf..c7c0679 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -62,16 +61,26 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor
{
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions)
{
+        Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic,
subscriptions);
 
-        final Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic,
subscriptions);
-        adjustAssignment(subscriptions, assignments);
+        Map<TopicPartition, String> partitionsTransferringOwnership = super.partitionsTransferringOwnership
== null ?
+            computePartitionsTransferringOwnership(subscriptions, assignments) :
+            super.partitionsTransferringOwnership;
+
+        adjustAssignment(assignments, partitionsTransferringOwnership);
         return assignments;
     }
 
     // Following the cooperative rebalancing protocol requires removing partitions that must
first be revoked from the assignment
-    private void adjustAssignment(final Map<String, Subscription> subscriptions,
-                                  final Map<String, List<TopicPartition>> assignments)
{
+    private void adjustAssignment(Map<String, List<TopicPartition>> assignments,
+                                  Map<TopicPartition, String> partitionsTransferringOwnership)
{
+        for (Map.Entry<TopicPartition, String> partitionEntry : partitionsTransferringOwnership.entrySet())
{
+            assignments.get(partitionEntry.getValue()).remove(partitionEntry.getKey());
+        }
+    }
 
+    private Map<TopicPartition, String> computePartitionsTransferringOwnership(Map<String,
Subscription> subscriptions,
+                                                                               Map<String,
List<TopicPartition>> assignments) {
         Map<TopicPartition, String> allAddedPartitions = new HashMap<>();
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
@@ -81,25 +90,20 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor
{
             List<TopicPartition> ownedPartitions = subscriptions.get(consumer).ownedPartitions();
             List<TopicPartition> assignedPartitions = entry.getValue();
 
-            List<TopicPartition> addedPartitions = new ArrayList<>(assignedPartitions);
-            addedPartitions.removeAll(ownedPartitions);
-            for (TopicPartition tp : addedPartitions) {
-                allAddedPartitions.put(tp, consumer);
+            Set<TopicPartition> ownedPartitionsSet = new HashSet<>(ownedPartitions);
+            for (TopicPartition tp : assignedPartitions) {
+                if (!ownedPartitionsSet.contains(tp))
+                    allAddedPartitions.put(tp, consumer);
             }
 
-            final Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
-            revokedPartitions.removeAll(assignedPartitions);
-            allRevokedPartitions.addAll(revokedPartitions);
-        }
-
-        // remove any partitions to be revoked from the current assignment
-        for (TopicPartition tp : allRevokedPartitions) {
-            // if partition is being migrated to another consumer, don't assign it there
yet
-            if (allAddedPartitions.containsKey(tp)) {
-                String assignedConsumer = allAddedPartitions.get(tp);
-                assignments.get(assignedConsumer).remove(tp);
+            Set<TopicPartition> assignedPartitionsSet = new HashSet<>(assignedPartitions);
+            for (TopicPartition tp : ownedPartitions) {
+                if (!assignedPartitionsSet.contains(tp))
+                    allRevokedPartitions.add(tp);
             }
         }
-    }
 
+        allAddedPartitions.keySet().retainAll(allRevokedPartitions);
+        return allAddedPartitions;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 12864de..d4e023c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -18,19 +18,22 @@ package org.apache.kafka.clients.consumer.internals;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +43,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
 
     public static final int DEFAULT_GENERATION = -1;
 
-    private PartitionMovements partitionMovements;
+    private PartitionMovements partitionMovements = new PartitionMovements();
+
+    // Keep track of the partitions being migrated from one consumer to another during assignment
+    // so the cooperative assignor can adjust the assignment
+    protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>();
 
     static final class ConsumerGenerationPair {
         final String consumer;
@@ -65,9 +72,206 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions)
{
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions))
{
+            log.debug("Detected that all consumers were subscribed to same set of topics,
invoking the "
+                          + "optimized assignment algorithm");
+            partitionsTransferringOwnership = new HashMap<>();
+            return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics,
falling back to the "
+                          + "general case assignment algorithm");
+            partitionsTransferringOwnership = null;
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    /**
+     * Returns true iff all consumers have an identical subscription. Also fills out the
passed in
+     * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed
partitions
+     */
+    private boolean allSubscriptionsEqual(Set<String> allTopics,
+                                          Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions)
{
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = DEFAULT_GENERATION;
+
+        Set<String> subscribedTopics = new HashSet<>();
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
{
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            List<TopicPartition> ownedPartitions = new ArrayList<>();
+            consumerToOwnedPartitions.put(consumer, ownedPartitions);
+
+            // Only consider this consumer's owned partitions as valid if it is a member
of the current highest
+            // generation, or it's generation is not present but we have not seen any known
generation so far
+            if (memberData.generation.isPresent() && memberData.generation.get()
>= maxGeneration
+                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION)
{
+
+                membersOfCurrentHighestGeneration.add(consumer);
+                for (final TopicPartition tp : memberData.partitions) {
+                    // filter out any topics that no longer exist or aren't part of the current
subscription
+                    if (allTopics.contains(tp.topic())) {
+                        ownedPartitions.add(tp);
+                    }
+                }
+
+                // If the current member's generation is higher, all the previous owned partitions
are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get()
> maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+            }
+        }
+
+        for (String consumer : membersWithOldGeneration) {
+            consumerToOwnedPartitions.get(consumer).clear();
+        }
+        return true;
+    }
+
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String,
Integer> partitionsPerTopic,
+                                                                Map<String, List<TopicPartition>>
consumerToOwnedPartitions) {
+        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
+
+        Set<TopicPartition> allRevokedPartitions = new HashSet<>();
+
+        // Each consumer should end up in exactly one of the below
+        List<String> unfilledMembers = new LinkedList<>();
+        Queue<String> maxCapacityMembers = new LinkedList<>();
+        Queue<String> minCapacityMembers = new LinkedList<>();
+
+        int numberOfConsumers = consumerToOwnedPartitions.size();
+        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
+
+        // initialize the assignment map with an empty array of size minQuota for all members
+        Map<String, List<TopicPartition>> assignment = new HashMap<>(
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c ->
c, c -> new ArrayList<>(minQuota))));
+
+        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet())
{
+            String consumer = consumerEntry.getKey();
+            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
+
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int i = 0;
+            // assign the first N partitions up to the max quota, and mark the remaining
as being revoked
+            for (TopicPartition tp : ownedPartitions) {
+                if (i < maxQuota) {
+                    consumerAssignment.add(tp);
+                    unassignedPartitions.remove(tp);
+                } else {
+                    allRevokedPartitions.add(tp);
+                }
+                ++i;
+            }
+
+            if (ownedPartitions.size() < minQuota) {
+                unfilledMembers.add(consumer);
+            } else {
+                // It's possible for a consumer to be at both min and max capacity if minQuota
== maxQuota
+                if (consumerAssignment.size() == minQuota)
+                    minCapacityMembers.add(consumer);
+                if (consumerAssignment.size() == maxQuota)
+                    maxCapacityMembers.add(consumer);
+            }
+        }
+
+        Collections.sort(unfilledMembers);
+        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
+
+        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
+            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+
+            while (unfilledConsumerIter.hasNext()) {
+                String consumer = unfilledConsumerIter.next();
+                List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+                if (unassignedPartitionsIter.hasNext()) {
+                    TopicPartition tp = unassignedPartitionsIter.next();
+                    consumerAssignment.add(tp);
+                    unassignedPartitionsIter.remove();
+                    // We already assigned all possible ownedPartitions, so we know this
must be newly to this consumer
+                    if (allRevokedPartitions.contains(tp))
+                        partitionsTransferringOwnership.put(tp, consumer);
+                } else {
+                    break;
+                }
+
+                if (consumerAssignment.size() == minQuota) {
+                    minCapacityMembers.add(consumer);
+                    unfilledConsumerIter.remove();
+                }
+            }
+        }
+
+        // If we ran out of unassigned partitions before filling all consumers, we need to
start stealing partitions
+        // from the over-full consumers at max capacity
+        for (String consumer : unfilledMembers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int remainingCapacity = minQuota - consumerAssignment.size();
+            while (remainingCapacity > 0) {
+                String overloadedConsumer = maxCapacityMembers.poll();
+                if (overloadedConsumer == null) {
+                    throw new IllegalStateException("Some consumers are under capacity but
all partitions have been assigned");
+                }
+                TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
+                consumerAssignment.add(swappedPartition);
+                --remainingCapacity;
+                // This partition is by definition transferring ownership, the swapped partition
must have come from
+                // the max capacity member's owned partitions since it can only reach max
capacity with owned partitions
+                partitionsTransferringOwnership.put(swappedPartition, consumer);
+            }
+            minCapacityMembers.add(consumer);
+        }
+
+        // Otherwise we may have run out of unfilled consumers before assigning all partitions,
in which case we
+        // should just distribute one partition each to all consumers at min capacity
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            String underCapacityConsumer = minCapacityMembers.poll();
+            if (underCapacityConsumer == null) {
+                throw new IllegalStateException("Some partitions are unassigned but all consumers
are at maximum capacity");
+            }
+            // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers
here since we are at the end
+            assignment.get(underCapacityConsumer).add(unassignedPartition);
+
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
+        }
+
+        return assignment;
+    }
+
+    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer>
partitionsPerTopic) {
+        SortedSet<TopicPartition> allPartitions =
+            new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
+            String topic = entry.getKey();
+            for (int i = 0; i < entry.getValue(); ++i) {
+                allPartitions.add(new TopicPartition(topic, i));
+            }
+        }
+        return allPartitions;
+    }
+
+    private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer>
partitionsPerTopic,
+                                                            Map<String, Subscription>
subscriptions) {
         Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
         Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<>();
-        partitionMovements = new PartitionMovements();
 
         prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment);
         boolean isFreshAssignment = currentAssignment.isEmpty();
@@ -105,8 +309,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
             for (TopicPartition topicPartition: entry.getValue())
                 currentPartitionConsumer.put(topicPartition, entry.getKey());
 
-        List<TopicPartition> sortedPartitions = sortPartitions(
-            currentAssignment, prevAssignment.keySet(), isFreshAssignment, partition2AllPotentialConsumers,
consumer2AllPotentialPartitions);
+        List<TopicPartition> sortedPartitions = sortPartitions(partition2AllPotentialConsumers);
 
         // all partitions that need to be assigned (initially set to all partitions but adjusted
in the following loop)
         List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);
@@ -287,96 +490,16 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
      * Sort valid partitions so they are processed in the potential reassignment phase in
the proper order
      * that causes minimal partition movement among consumers (hence honoring maximal stickiness)
      *
-     * @param currentAssignment the calculated assignment so far
-     * @param partitionsWithADifferentPreviousAssignment partitions that had a different
consumer before (for every
-     *                                                   such partition there should also
be a mapping in
-     *                                                   @currentAssignment to a different
consumer)
-     * @param isFreshAssignment whether this is a new assignment, or a reassignment of an
existing one
      * @param partition2AllPotentialConsumers a mapping of partitions to their potential
consumers
-     * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions
they can consumer from
-     * @return sorted list of valid partitions
+     * @return  an ascending sorted list of topic partitions based on how many consumers
can potentially use them
      */
-    private List<TopicPartition> sortPartitions(Map<String, List<TopicPartition>>
currentAssignment,
-                                                Set<TopicPartition> partitionsWithADifferentPreviousAssignment,
-                                                boolean isFreshAssignment,
-                                                Map<TopicPartition, List<String>>
partition2AllPotentialConsumers,
-                                                Map<String, List<TopicPartition>>
consumer2AllPotentialPartitions) {
-        List<TopicPartition> sortedPartitions = new ArrayList<>();
-
-        if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers,
consumer2AllPotentialPartitions)) {
-            // if this is a reassignment and the subscriptions are identical (all consumers
can consumer from all topics)
-            // then we just need to simply list partitions in a round robin fashion (from
consumers with
-            // most assigned partitions to those with least)
-            Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment);
-            for (Entry<String, List<TopicPartition>> entry: assignments.entrySet())
{
-                List<TopicPartition> toRemove = new ArrayList<>();
-                for (TopicPartition partition: entry.getValue())
-                    if (!partition2AllPotentialConsumers.keySet().contains(partition))
-                        toRemove.add(partition);
-                for (TopicPartition partition: toRemove)
-                    entry.getValue().remove(partition);
-            }
-            TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments));
-            sortedConsumers.addAll(assignments.keySet());
-            // at this point, sortedConsumers contains an ascending-sorted list of consumers
based on
-            // how many valid partitions are currently assigned to them
-
-            while (!sortedConsumers.isEmpty()) {
-                // take the consumer with the most partitions
-                String consumer = sortedConsumers.pollLast();
-                // currently assigned partitions to this consumer
-                List<TopicPartition> remainingPartitions = assignments.get(consumer);
-                // partitions that were assigned to a different consumer last time
-                List<TopicPartition> prevPartitions = new ArrayList<>(partitionsWithADifferentPreviousAssignment);
-                // from partitions that had a different consumer before, keep only those
that are
-                // assigned to this consumer now
-                prevPartitions.retainAll(remainingPartitions);
-                if (!prevPartitions.isEmpty()) {
-                    // if there is a partition of this consumer that was assigned to another
consumer before
-                    // mark it as good options for reassignment
-                    TopicPartition partition = prevPartitions.remove(0);
-                    remainingPartitions.remove(partition);
-                    sortedPartitions.add(partition);
-                    sortedConsumers.add(consumer);
-                } else if (!remainingPartitions.isEmpty()) {
-                    // otherwise, mark any other one of the current partitions as a reassignment
candidate
-                    sortedPartitions.add(remainingPartitions.remove(0));
-                    sortedConsumers.add(consumer);
-                }
-            }
-
-            for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) {
-                if (!sortedPartitions.contains(partition))
-                    sortedPartitions.add(partition);
-            }
-
-        } else {
-            // an ascending sorted set of topic partitions based on how many consumers can
potentially use them
-            TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers));
-            sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet());
-
-            while (!sortedAllPartitions.isEmpty())
-                sortedPartitions.add(sortedAllPartitions.pollFirst());
-        }
-
+    private List<TopicPartition> sortPartitions(Map<TopicPartition, List<String>>
partition2AllPotentialConsumers) {
+        List<TopicPartition> sortedPartitions = new ArrayList<>(partition2AllPotentialConsumers.keySet());
+        Collections.sort(sortedPartitions, new PartitionComparator(partition2AllPotentialConsumers));
         return sortedPartitions;
     }
 
     /**
-     * @param partition2AllPotentialConsumers a mapping of partitions to their potential
consumers
-     * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions
they can consumer from
-     * @return true if potential consumers of partitions are the same, and potential partitions
consumers can
-     * consumer from are the same too
-     */
-    private boolean areSubscriptionsIdentical(Map<TopicPartition, List<String>>
partition2AllPotentialConsumers,
-        Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
-        if (!hasIdenticalListElements(partition2AllPotentialConsumers.values()))
-            return false;
-
-        return hasIdenticalListElements(consumer2AllPotentialPartitions.values());
-    }
-
-    /**
      * The assignment should improve the overall balance of the partition assignments to
consumers.
      */
     private void assignPartition(TopicPartition partition,
@@ -601,24 +724,6 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
         return partitionMovements.isSticky();
     }
 
-    /**
-     * @param col a collection of elements of type list
-     * @return true if all lists in the collection have the same members; false otherwise
-     */
-    private <T> boolean hasIdenticalListElements(Collection<List<T>> col)
{
-        Iterator<List<T>> it = col.iterator();
-        if (!it.hasNext())
-            return true;
-        List<T> cur = it.next();
-        while (it.hasNext()) {
-            List<T> next = it.next();
-            if (!(cur.containsAll(next) && next.containsAll(cur)))
-                return false;
-            cur = next;
-        }
-        return true;
-    }
-
     private void deepCopy(Map<String, List<TopicPartition>> source, Map<String,
List<TopicPartition>> dest) {
         dest.clear();
         for (Entry<String, List<TopicPartition>> entry: source.entrySet())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 01a8f3e..fb89944 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -169,10 +169,10 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest {
         TopicPartition tp5 = new TopicPartition(topic, 5);
 
         List<TopicPartition> c1partitions0 = partitions(tp0, tp1, tp4);
-        List<TopicPartition> c2partitions0 = partitions(tp0, tp2, tp3);
+        List<TopicPartition> c2partitions0 = partitions(tp0, tp1, tp2);
         List<TopicPartition> c3partitions0 = partitions(tp3, tp4, tp5);
         subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic), c1partitions0,
1));
-        subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic), c2partitions0,
1));
+        subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic), c2partitions0,
2));
         subscriptions.put(consumer3, buildSubscriptionWithGeneration(topics(topic), c3partitions0,
2));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
subscriptions);
@@ -181,7 +181,6 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest {
         List<TopicPartition> c3partitions = assignment.get(consumer3);
 
         assertTrue(c1partitions.size() == 2 && c2partitions.size() == 2 &&
c3partitions.size() == 2);
-        assertTrue(c1partitions0.containsAll(c1partitions));
         assertTrue(c2partitions0.containsAll(c2partitions));
         assertTrue(c3partitions0.containsAll(c3partitions));
         verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 52f6747..c7b4523 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -33,12 +33,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public abstract class AbstractStickyAssignorTest {
-
     protected AbstractStickyAssignor assignor;
     protected String consumerId = "consumer";
     protected Map<String, Subscription> subscriptions;
@@ -105,12 +106,16 @@ public abstract class AbstractStickyAssignorTest {
         String otherTopic = "other";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        partitionsPerTopic.put(topic, 3);
-        partitionsPerTopic.put(otherTopic, 3);
-        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
+        partitionsPerTopic.put(topic, 2);
+        subscriptions = mkMap(
+                mkEntry(consumerId, buildSubscription(
+                        topics(topic),
+                        Arrays.asList(tp(topic, 0), tp(topic, 1), tp(otherTopic, 0), tp(otherTopic,
1)))
+                )
+        );
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
subscriptions);
-        assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumerId));
 
         verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
         assertTrue(isFullyBalanced(assignment));
@@ -145,8 +150,6 @@ public abstract class AbstractStickyAssignorTest {
         subscriptions.put(consumer2, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
subscriptions);
-        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
-        assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
 
         verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
         assertTrue(isFullyBalanced(assignment));
@@ -238,8 +241,8 @@ public abstract class AbstractStickyAssignorTest {
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
 
         verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
-        assertEquals(partitions(tp(topic, 2), tp(topic, 1)), assignment.get(consumer1));
-        assertEquals(partitions(tp(topic, 0)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
         assertTrue(isFullyBalanced(assignment));
         assertTrue(assignor.isSticky());
 
@@ -425,8 +428,37 @@ public abstract class AbstractStickyAssignorTest {
         assertTrue(assignor.isSticky());
     }
 
+    @Test(timeout = 30 * 1000)
+    public void testLargeAssignmentAndGroupWithUniformSubscription() {
+        // 1 million partitions!
+        int topicCount = 500;
+        int partitionCount = 2_000;
+        int consumerCount = 2_000;
+
+        List<String> topics = new ArrayList<>();
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = getTopicName(i, topicCount);
+            topics.add(topicName);
+            partitionsPerTopic.put(topicName, partitionCount);
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics));
+        }
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
subscriptions);
+
+        for (int i = 1; i < consumerCount; i++) {
+            String consumer = getConsumerName(i, consumerCount);
+            subscriptions.put(consumer, buildSubscription(topics, assignment.get(consumer)));
+        }
+
+        assignor.assign(partitionsPerTopic, subscriptions);
+    }
+
     @Test
-    public void testLargeAssignmentWithMultipleConsumersLeaving() {
+    public void testLargeAssignmentWithMultipleConsumersLeavingAndRandomSubscription() {
         Random rand = new Random();
         int topicCount = 40;
         int consumerCount = 200;
@@ -555,7 +587,6 @@ public abstract class AbstractStickyAssignorTest {
         }
     }
 
-
     @Test
     public void testAssignmentUpdatedForDeletedTopic() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
@@ -583,35 +614,6 @@ public abstract class AbstractStickyAssignorTest {
     }
 
     @Test
-    public void testConflictingPreviousAssignments() {
-        String consumer1 = "consumer1";
-        String consumer2 = "consumer2";
-
-        Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        partitionsPerTopic.put(topic, 2);
-        subscriptions.put(consumer1, new Subscription(topics(topic)));
-        subscriptions.put(consumer2, new Subscription(topics(topic)));
-
-        TopicPartition tp0 = new TopicPartition(topic, 0);
-        TopicPartition tp1 = new TopicPartition(topic, 1);
-
-        // both c1 and c2 have partition 1 assigned to them in generation 1
-        List<TopicPartition> c1partitions0 = partitions(tp0, tp1);
-        List<TopicPartition> c2partitions0 = partitions(tp0, tp1);
-        subscriptions.put(consumer1, buildSubscription(topics(topic), c1partitions0));
-        subscriptions.put(consumer2, buildSubscription(topics(topic), c2partitions0));
-
-        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
subscriptions);
-        List<TopicPartition> c1partitions = assignment.get(consumer1);
-        List<TopicPartition> c2partitions = assignment.get(consumer2);
-
-        assertTrue(c1partitions.size() == 1 && c2partitions.size() == 1);
-        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
-        assertTrue(isFullyBalanced(assignment));
-        assertTrue(assignor.isSticky());
-    }
-
-    @Test
     public void testReassignmentWithRandomSubscriptionsAndChanges() {
         final int minNumConsumers = 20;
         final int maxNumConsumers = 40;


Mime
View raw message