kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Break up StreamsPartitionAssignor's gargantuan #assign (#8245)
Date Sat, 07 Mar 2020 16:47:03 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b03d67  MINOR: Break up StreamsPartitionAssignor's gargantuan #assign (#8245)
4b03d67 is described below

commit 4b03d67e106c6d3b9dd465a308a62b400ead70a4
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Sat Mar 7 08:46:38 2020 -0800

    MINOR: Break up StreamsPartitionAssignor's gargantuan #assign (#8245)
    
    Just a minor refactoring of StreamsPartitionAssignor's endless assign method into logical
chunks to hopefully improve readability. No logical changes, literally just moving code around
and adding docs.
    
    The hope is to make it easier to write and review KIP-441 PRs that dig into the assignment
logic.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 434 ++++++++++++++-------
 1 file changed, 287 insertions(+), 147 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 639509f..e858e86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -153,6 +153,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         }
     }
 
+    // keep track of any future consumers in a "dummy" Client since we can't decipher their
subscription
+    private static final UUID FUTURE_ID = randomUUID();
 
     protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
         Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
@@ -231,10 +233,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     }
 
     private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata>
clientsMetadata,
-                                                    final String topic,
                                                     final int errorCode) {
-        log.error("{} is unknown yet during rebalance," +
-            " please make sure they have been pre-created before starting the Streams application.",
topic);
         final Map<String, Assignment> assignment = new HashMap<>();
         for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
             for (final String consumerId : clientMetadata.consumers) {
@@ -275,14 +274,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     @Override
     public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription)
{
         final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
+
+        // ---------------- Step Zero ---------------- //
+
         // construct the client metadata from the decoded subscription info
+
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<TopicPartition> allOwnedPartitions = new HashSet<>();
 
-        // keep track of any future consumers in a "dummy" Client since we can't decipher
their subscription
-        final UUID futureId = randomUUID();
-        final ClientMetadata futureClient = new ClientMetadata(null);
-
         int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
@@ -299,9 +298,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             final UUID processId;
             if (usedVersion > LATEST_SUPPORTED_VERSION) {
                 futureMetadataVersion = usedVersion;
-                processId = futureId;
-                if (!clientMetadataMap.containsKey(futureId)) {
-                    clientMetadataMap.put(futureId, futureClient);
+                processId = FUTURE_ID;
+                if (!clientMetadataMap.containsKey(FUTURE_ID)) {
+                    clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null));
                 }
             } else {
                 processId = info.processId();
@@ -321,13 +320,107 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             clientMetadata.addPreviousTasks(info);
         }
 
+        final boolean versionProbing =
+            checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion,
futureMetadataVersion);
+
+        log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
+
+        // ---------------- Step One ---------------- //
+
+        // parse the topology to determine the repartition source topics,
+        // making sure they are created with the number of partitions as
+        // the maximum of the depending sub-topologies source topics' number of partitions
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
+
+        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
+        try {
+            allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap,
+                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        }
+
+        final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
+
+        log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
+
+        // ---------------- Step Two ---------------- //
+
+        // construct the assignment of tasks to clients
+
+        final Set<String> allSourceTopics = new HashSet<>();
+        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
+            allSourceTopics.addAll(entry.getValue().sourceTopics);
+            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
+
+        // get the tasks as partition groups from the partition grouper
+        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+            partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
+
+
+        assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups, clientMetadataMap,
fullMetadata);
+
+        // ---------------- Step Three ---------------- //
+
+        // construct the global partition assignment per host map
+
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
+        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new
HashMap<>();
+        if (minReceivedMetadataVersion >= 2) {
+            populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask,
clientMetadataMap);
+        }
+        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
+
+        // ---------------- Step Four ---------------- //
+
+        // compute the assignment of tasks to threads within each client and build the final
group assignment
+
+        final Map<String, Assignment> assignment;
+        if (versionProbing) {
+            assignment = versionProbingAssignment(
+                clientMetadataMap,
+                partitionsForTask,
+                partitionsByHost,
+                standbyPartitionsByHost,
+                allOwnedPartitions,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
+        } else {
+            assignment = computeNewAssignment(
+                clientMetadataMap,
+                partitionsForTask,
+                partitionsByHost,
+                standbyPartitionsByHost,
+                allOwnedPartitions,
+                minReceivedMetadataVersion,
+                minSupportedMetadataVersion
+            );
+        }
+
+        return new GroupAssignment(assignment);
+    }
+
+    /**
+     * Verify the subscription versions are within the expected bounds and check for version
probing.
+     *
+     * @return whether this was a version probing rebalance
+     */
+    private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
+                                          final int minSupportedMetadataVersion,
+                                          final int futureMetadataVersion) {
         final boolean versionProbing;
+
         if (futureMetadataVersion == UNKNOWN) {
             versionProbing = false;
         } else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
             versionProbing = true;
             log.info("Received a future (version probing) subscription (version: {})."
-                    + " Sending assignment back (with supported version {}).",
+                         + " Sending assignment back (with supported version {}).",
                 futureMetadataVersion,
                 minSupportedMetadataVersion);
 
@@ -349,35 +442,76 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 minSupportedMetadataVersion,
                 LATEST_SUPPORTED_VERSION);
         }
+        return versionProbing;
+    }
 
-        log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
-
-        // ---------------- Step Zero ---------------- //
-
-        // parse the topology to determine the repartition source topics,
-        // making sure they are created with the number of partitions as
-        // the maximum of the depending sub-topologies source topics' number of partitions
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
-            taskManager.builder().topicGroups();
-
+    /**
+     * @return a map of repartition topics and their metadata
+     * @throws TaskAssignmentException if there is incomplete source topic metadata due to
missing source topic(s)
+     */
+    private Map<String, InternalTopicConfig> computeRepartitionTopicMetadata(final
Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                                                             final Cluster
metadata) throws TaskAssignmentException {
         final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
         for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values())
{
             for (final String topic : topicsInfo.sourceTopics) {
                 if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
-                    !metadata.topics().contains(topic)) {
-                    log.error("Missing source topic {} during assignment. Returning error
{}.",
-                        topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    return new GroupAssignment(
-                        errorAssignment(clientMetadataMap, topic,
-                            AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-                    );
+                        !metadata.topics().contains(topic)) {
+                    log.error("Source topic {} is missing/unknown during rebalance, please
make sure all source topics " +
+                                  "have been pre-created before starting the Streams application.
Returning error {}",
+                                  topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
+                    throw new TaskAssignmentException("Missing source topic during assignment.");
                 }
             }
             for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values())
{
                 repartitionTopicMetadata.put(topic.name(), topic);
             }
         }
+        return repartitionTopicMetadata;
+    }
 
+    /**
+     * Computes and assembles all repartition topic metadata then creates the topics if necessary.
+     *
+     * @return map from repartition topic to its partition info
+     */
+    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Map<Integer,
InternalTopologyBuilder.TopicsInfo> topicGroups,
+                                                                           final Cluster
metadata) {
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicMetadata(topicGroups,
metadata);
+
+        setRepartitionTopicMetadataNumberOfPartitions(repartitionTopicMetadata, topicGroups,
metadata);
+
+        // ensure the co-partitioning topics within the group have the same number of partitions,
+        // and enforce the number of partitions for those repartition topics to be the same
if they
+        // are co-partitioned as well.
+        ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata,
metadata);
+
+        // make sure the repartition source topics exist with the right number of partitions,
+        // create these topics if necessary
+        prepareTopic(repartitionTopicMetadata);
+
+        // augment the metadata with the newly computed number of partitions for all the
+        // repartition source topics
+        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new
HashMap<>();
+        for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet())
{
+            final String topic = entry.getKey();
+            final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
+
+            for (int partition = 0; partition < numPartitions; partition++) {
+                allRepartitionTopicPartitions.put(
+                    new TopicPartition(topic, partition),
+                    new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
+                );
+            }
+        }
+        return allRepartitionTopicPartitions;
+    }
+
+    /**
+     * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata
+     */
+    private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig>
repartitionTopicMetadata,
+                                                               final Map<Integer, InternalTopologyBuilder.TopicsInfo>
topicGroups,
+                                                               final Cluster metadata) {
         boolean numPartitionsNeeded;
         do {
             numPartitionsNeeded = false;
@@ -385,7 +519,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values())
{
                 for (final String topicName : topicsInfo.repartitionSourceTopics.keySet())
{
                     final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                        .numberOfPartitions();
+                                                                     .numberOfPartitions();
                     Integer numPartitions = null;
 
                     if (!maybeNumPartitions.isPresent()) {
@@ -437,53 +571,25 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 }
             }
         } while (numPartitionsNeeded);
+    }
 
-        // ensure the co-partitioning topics within the group have the same number of partitions,
-        // and enforce the number of partitions for those repartition topics to be the same
if they
-        // are co-partitioned as well.
-        ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata,
metadata);
-
-        // make sure the repartition source topics exist with the right number of partitions,
-        // create these topics if necessary
-        prepareTopic(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions for all the
-        // repartition source topics
-        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new
HashMap<>();
-        for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet())
{
-            final String topic = entry.getKey();
-            final int numPartitions = entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                allRepartitionTopicPartitions.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new Node[0])
-                );
-            }
-        }
-
-        final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
-
-        log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
-
-        // ---------------- Step One ---------------- //
-
-        // get the tasks as partition groups from the partition grouper
-        final Set<String> allSourceTopics = new HashSet<>();
-        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet())
{
-            allSourceTopics.addAll(entry.getValue().sourceTopics);
-            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
-        }
-
-        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
-            partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
-
-        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
-
+    /**
+     * Populates the taskForPartition and tasksForTopicGroup maps, and checks that partitions
are assigned to exactly
+     * one task.
+     *
+     * @param taskForPartition a map from partition to the corresponding task. Populated
here.
+     * @param tasksForTopicGroup a map from the topicGroupId to the set of corresponding
tasks. Populated here.
+     * @param allSourceTopics a set of all source topics in the topology
+     * @param partitionsForTask a map from task to the set of input partitions
+     * @param fullMetadata the cluster metadata
+     */
+    private void populateTasksForMaps(final Map<TopicPartition, TaskId> taskForPartition,
+                                      final Map<Integer, Set<TaskId>> tasksForTopicGroup,
+                                      final Set<String> allSourceTopics,
+                                      final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                      final Cluster fullMetadata) {
         // check if all partitions are assigned, and there are no duplicates of partitions
in multiple tasks
         final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
-        final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet())
{
             final TaskId id = entry.getKey();
             final Set<TopicPartition> partitions = entry.getValue();
@@ -496,8 +602,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             }
             allAssignedPartitions.addAll(partitions);
 
-            tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id);
+            tasksForTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id);
         }
+
+        checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
+    }
+
+    // Logs a warning if any partitions are not assigned to a task, or a task has no assigned
partitions
+    private void checkAllPartitions(final Set<String> allSourceTopics,
+                                    final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                    final Set<TopicPartition> allAssignedPartitions,
+                                    final Cluster fullMetadata) {
         for (final String topic : allSourceTopics) {
             final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
             if (partitionInfoList.isEmpty()) {
@@ -508,17 +623,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                         partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
                         log.warn("Partition {} is not assigned to any tasks: {}"
-                                + " Possible causes of a partition not getting assigned"
-                                + " is that another topic defined in the topology has not
been"
-                                + " created when starting your streams application,"
-                                + " resulting in no tasks created for this topology at all.",
partition,
+                                     + " Possible causes of a partition not getting assigned"
+                                     + " is that another topic defined in the topology has
not been"
+                                     + " created when starting your streams application,"
+                                     + " resulting in no tasks created for this topology
at all.", partition,
                             partitionsForTask);
                     }
                 }
             }
         }
+    }
 
-        // We only create a standby for tasks that are stateful and have at least one changelog
+    /**
+     * Resolve changelog topic metadata and create them if necessary.
+     *
+     * @return set of standby task ids (any task that is stateful and has logging enabled)
+     */
+    private Set<TaskId> prepareChangelogTopics(final Map<Integer, InternalTopologyBuilder.TopicsInfo>
topicGroups,
+                                               final Map<Integer, Set<TaskId>>
tasksForTopicGroup) {
         final Set<TaskId> standbyTaskIds = new HashSet<>();
 
         // add tasks to state change log topic subscribers
@@ -527,7 +649,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             final int topicGroupId = entry.getKey();
             final InternalTopologyBuilder.TopicsInfo topicsInfo = entry.getValue();
 
-            final Set<TaskId> topicGroupTasks = tasksByTopicGroup.get(topicGroupId);
+            final Set<TaskId> topicGroupTasks = tasksForTopicGroup.get(topicGroupId);
             if (topicGroupTasks == null) {
                 log.debug("No tasks found for topic group {}", topicGroupId);
                 continue;
@@ -551,10 +673,23 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         }
 
         prepareTopic(changelogTopicMetadata);
-
         log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
+        return standbyTaskIds;
+    }
 
-        // ---------------- Step Two ---------------- //
+    /**
+     * Assigns a set of tasks to each client (Streams instance) using the sticky assignor
+     */
+    private void assignTasksToClients(final Set<String> allSourceTopics,
+                                      final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                      final Map<Integer, InternalTopologyBuilder.TopicsInfo>
topicGroups,
+                                      final Map<UUID, ClientMetadata> clientMetadataMap,
+                                      final Cluster fullMetadata) {
+        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
+        final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
+        populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask,
fullMetadata);
+
+        final Set<TaskId> standbyTaskIds = prepareChangelogTopics(topicGroups, tasksForTopicGroup);
 
         final Map<UUID, ClientState> states = new HashMap<>();
         for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet())
{
@@ -565,7 +700,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             // there are two cases where we need to construct the prevTasks from the ownedPartitions:
             // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks and rely
on ownedPartitions instead
             // 2) future client during version probing, when we can't decode the future subscription
info's prev tasks
-            if (!state.ownedPartitions().isEmpty() && (uuid == futureId || state.prevActiveTasks().isEmpty()))
{
+            if (!state.ownedPartitions().isEmpty() && (uuid == FUTURE_ID || state.prevActiveTasks().isEmpty()))
{
                 final Set<TaskId> previousActiveTasks = new HashSet<>();
                 for (final Map.Entry<TopicPartition, String> partitionEntry : state.ownedPartitions().entrySet())
{
                     final TopicPartition tp = partitionEntry.getKey();
@@ -589,64 +724,49 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         taskAssignor.assign(numStandbyReplicas);
 
         log.info("Assigned tasks to clients as {}{}.", Utils.NL, states.entrySet().stream()
-            .map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
-
-        // ---------------- Step Three ---------------- //
-
-        // construct the global partition assignment per host map
-        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
-        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new
HashMap<>();
-        if (minReceivedMetadataVersion >= 2) {
-            for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet())
{
-                final HostInfo hostInfo = entry.getValue().hostInfo;
+                                                                     .map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
+    }
 
-                // if application server is configured, also include host state map
-                if (hostInfo != null) {
-                    final Set<TopicPartition> topicPartitions = new HashSet<>();
-                    final Set<TopicPartition> standbyPartitions = new HashSet<>();
-                    final ClientState state = entry.getValue().state;
+    /**
+     * Populates the global partitionsByHost and standbyPartitionsByHost maps that are sent
to each member
+     *
+     * @param partitionsByHost a map from host to the set of partitions hosted there. Populated
here.
+     * @param standbyPartitionsByHost a map from host to the set of standby partitions hosted
there. Populated here.
+     * @param partitionsForTask a map from task to its set of assigned partitions
+     * @param clientMetadataMap a map from client to its metadata and state
+     */
+    private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition>>
partitionsByHost,
+                                              final Map<HostInfo, Set<TopicPartition>>
standbyPartitionsByHost,
+                                              final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                              final Map<UUID, ClientMetadata> clientMetadataMap)
{
+        for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet())
{
+            final HostInfo hostInfo = entry.getValue().hostInfo;
 
-                    for (final TaskId id : state.activeTasks()) {
-                        topicPartitions.addAll(partitionsForTask.get(id));
-                    }
+            // if application server is configured, also include host state map
+            if (hostInfo != null) {
+                final Set<TopicPartition> topicPartitions = new HashSet<>();
+                final Set<TopicPartition> standbyPartitions = new HashSet<>();
+                final ClientState state = entry.getValue().state;
 
-                    for (final TaskId id : state.standbyTasks()) {
-                        standbyPartitions.addAll(partitionsForTask.get(id));
-                    }
+                for (final TaskId id : state.activeTasks()) {
+                    topicPartitions.addAll(partitionsForTask.get(id));
+                }
 
-                    partitionsByHost.put(hostInfo, topicPartitions);
-                    standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+                for (final TaskId id : state.standbyTasks()) {
+                    standbyPartitions.addAll(partitionsForTask.get(id));
                 }
-            }
-        }
-        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
 
-        final Map<String, Assignment> assignment;
-        if (versionProbing) {
-            assignment = versionProbingAssignment(
-                clientMetadataMap,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                minReceivedMetadataVersion,
-                minSupportedMetadataVersion
-            );
-        } else {
-            assignment = computeNewAssignment(
-                clientMetadataMap,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                minReceivedMetadataVersion,
-                minSupportedMetadataVersion
-            );
+                partitionsByHost.put(hostInfo, topicPartitions);
+                standbyPartitionsByHost.put(hostInfo, standbyPartitions);
+            }
         }
-
-        return new GroupAssignment(assignment);
     }
 
+    /**
+     * Computes the assignment of tasks to threads within each client and assembles the final
assignment to send out.
+     *
+     * @return the final assignment for each StreamThread consumer
+     */
     private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata>
clientsMetadata,
                                                          final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
                                                          final Map<HostInfo, Set<TopicPartition>>
partitionsByHostState,
@@ -695,6 +815,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         return assignment;
     }
 
+    /**
+     * Computes the assignment of tasks to threads within each client and assembles the final
assignment to send out,
+     * in the special case of version probing where some members are on different versions
and have sent different
+     * subscriptions.
+     *
+     * @return the final assignment for each StreamThread consumer
+     */
     private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata>
clientsMetadata,
                                                              final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
                                                              final Map<HostInfo, Set<TopicPartition>>
partitionsByHost,
@@ -732,6 +859,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         return assignment;
     }
 
+    /**
+     * Adds the encoded assignment for each StreamThread consumer in the client to the overall
assignment map
+     */
     private void addClientAssignments(final Map<String, Assignment> assignment,
                                       final ClientMetadata clientMetadata,
                                       final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
@@ -747,17 +877,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         for (final String consumer : clientMetadata.consumers) {
             final List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer);
 
-            // These will be filled in by buildAssignedActiveTaskAndPartitionsList below
+            // These will be filled in by populateActiveTaskAndPartitionsLists below
             final List<TopicPartition> activePartitionsList = new ArrayList<>();
             final List<TaskId> assignedActiveList = new ArrayList<>();
 
-            buildAssignedActiveTaskAndPartitionsList(consumer,
-                                                     clientMetadata.state,
-                                                     activeTasksForConsumer,
-                                                     partitionsForTask,
-                                                     allOwnedPartitions,
-                                                     activePartitionsList,
-                                                     assignedActiveList);
+            populateActiveTaskAndPartitionsLists(
+                activePartitionsList,
+                assignedActiveList,
+                consumer,
+                clientMetadata.state,
+                activeTasksForConsumer,
+                partitionsForTask,
+                allOwnedPartitions
+            );
 
             final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
                 buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask);
@@ -781,13 +913,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         }
     }
 
-    private void buildAssignedActiveTaskAndPartitionsList(final String consumer,
-                                                          final ClientState clientState,
-                                                          final List<TaskId> activeTasksForConsumer,
-                                                          final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
-                                                          final Set<TopicPartition>
allOwnedPartitions,
-                                                          final List<TopicPartition>
activePartitionsList,
-                                                          final List<TaskId> assignedActiveList)
{
+    /**
+     * Populates the lists of active tasks and active task partitions for the consumer with
a 1:1 mapping between them
+     * such that the nth task corresponds to the nth partition in the list. This means tasks
with multiple partitions
+     * will be repeated in the list.
+     */
+    private void populateActiveTaskAndPartitionsLists(final List<TopicPartition> activePartitionsList,
+                                                      final List<TaskId> assignedActiveList,
+                                                      final String consumer,
+                                                      final ClientState clientState,
+                                                      final List<TaskId> activeTasksForConsumer,
+                                                      final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                                      final Set<TopicPartition> allOwnedPartitions)
{
         final List<AssignedPartition> assignedPartitions = new ArrayList<>();
 
         // Build up list of all assigned partition-task pairs
@@ -822,6 +959,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         }
     }
 
+    /**
+     * @return map from task id to its assigned partitions for all standby tasks
+     */
     private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final
Collection<TaskId> standbys,
                                                                         final Map<TaskId,
Set<TopicPartition>> partitionsForTask) {
         final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();


Mime
View raw message