kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: MINOR: Reduce in-memory copies of partition objects in onJoinComplete() and onJoinPrepare()
Date Wed, 03 Oct 2018 12:25:18 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 044350f  MINOR: Reduce in-memory copies of partition objects in onJoinComplete()
and onJoinPrepare()
044350f is described below

commit 044350f28e4d0edcc0503b2bc2593c0d25948ed0
Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
AuthorDate: Wed Oct 3 05:24:51 2018 -0700

    MINOR: Reduce in-memory copies of partition objects in onJoinComplete() and onJoinPrepare()
    
    `ConsumerCoordinator.onJoinPrepare()` currently makes multiple copies of the set of assigned
partitions. We can let `subscriptions.assignedPartitions()` return a view of the underlying
partition set, copy it only once and re-use the copied value.
    
    Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Dong Lin <lindong28@gmail.com>
    
    Closes #5124 from radai-rosenblatt/copy-all-the-things
---
 .../clients/consumer/internals/ConsumerCoordinator.java    | 14 ++++++++------
 .../clients/consumer/internals/SubscriptionState.java      |  3 +++
 .../org/apache/kafka/common/internals/PartitionStates.java |  8 +++++---
 3 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3b0a5fa..afb17f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -255,7 +255,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         //
         // TODO this part of the logic should be removed once we allow regex on leader assign
         Set<String> addedTopics = new HashSet<>();
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+        //this is a copy because its handed to listener below
+        Set<TopicPartition> assignedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+        for (TopicPartition tp : assignedPartitions) {
             if (!joinedSubscription.contains(tp.topic()))
                 addedTopics.add(tp.topic());
         }
@@ -284,10 +286,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
-        log.info("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
+        log.info("Setting newly assigned partitions {}", assignedPartitions);
         try {
-            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
-            listener.onPartitionsAssigned(assigned);
+            listener.onPartitionsAssigned(assignedPartitions);
         } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
@@ -452,9 +453,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
-        log.info("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
+        // copy since about to be handed to user code
+        Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
+        log.info("Revoking previously assigned partitions {}", revoked);
         try {
-            Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
         } catch (WakeupException | InterruptException e) {
             throw e;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 542c413..1a5dd92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -264,6 +264,9 @@ public class SubscriptionState {
         assignedState(tp).seek(offset);
     }
 
+    /**
+     * @return an unmodifiable view of the currently assigned partitions
+     */
     public Set<TopicPartition> assignedPartitions() {
         return this.assignment.partitionSet();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index 2918dd6..5339a28 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.internals;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -43,6 +43,7 @@ import java.util.Set;
 public class PartitionStates<S> {
 
     private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
+    private final Set<TopicPartition> partitionSetView = Collections.unmodifiableSet(map.keySet());
 
     /* the number of partitions that are currently assigned available in a thread safe manner
*/
     private volatile int size = 0;
@@ -67,10 +68,11 @@ public class PartitionStates<S> {
     }
 
     /**
-     * Returns the partitions in random order.
+     * Returns an unmodifiable view of the partitions in random order.
+     * changes to this PartitionStates instance will be reflected in this view.
      */
     public Set<TopicPartition> partitionSet() {
-        return new HashSet<>(map.keySet());
+        return partitionSetView;
     }
 
     public void clear() {


Mime
View raw message