kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3432; Cluster.update() thread-safety
Date Wed, 23 Mar 2016 20:53:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 f5e1ca625 -> 63e9d246b


KAFKA-3432; Cluster.update() thread-safety

Replace `update` with `withPartitions`, which returns a copy instead of mutating the instance.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1118 from ijuma/kafka-3432-cluster-update-thread-safety

(cherry picked from commit d4d5920ed40736d21f056188efa8a86c93e22506)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63e9d246
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63e9d246
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63e9d246

Branch: refs/heads/0.10.0
Commit: 63e9d246b7978152e12aa4ea8b1ba6bcc40f4498
Parents: f5e1ca6
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Mar 23 13:53:37 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 23 13:53:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/Cluster.java   | 40 ++++----------------
 .../internals/StreamPartitionAssignor.java      | 12 +++++-
 2 files changed, 17 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63e9d246/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 4f37358..8e85df8 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -19,6 +19,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +51,7 @@ public final class Cluster {
         this.nodes = Collections.unmodifiableList(copy);
         
         this.nodesById = new HashMap<>();
-        for (Node node: nodes)
+        for (Node node : nodes)
             this.nodesById.put(node.id(), node);
 
         // index the partitions by topic/partition for quick lookup
@@ -118,39 +119,12 @@ public final class Cluster {
     }
 
     /**
-     * Update the cluster information for specific topic with new partition information
+     * Return a copy of this cluster combined with `partitions`.
      */
-    public Cluster update(String topic, Collection<PartitionInfo> partitions) {
-
-        // re-index the partitions by topic/partition for quick lookup
-        for (PartitionInfo p : partitions)
-            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()),
p);
-
-        // re-index the partitions by topic and node respectively
-        this.partitionsByTopic.put(topic, Collections.unmodifiableList(new ArrayList<>(partitions)));
-
-        List<PartitionInfo> availablePartitions = new ArrayList<>();
-        for (PartitionInfo part : partitions) {
-            if (part.leader() != null)
-                availablePartitions.add(part);
-        }
-        this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
-
-        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
-        for (Node n : this.nodes) {
-            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
-        }
-        for (PartitionInfo p : partitions) {
-            if (p.leader() != null) {
-                List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
-                psNode.add(p);
-            }
-        }
-
-        for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
-            this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
-
-        return this;
+    public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
+        Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
+        combinedPartitions.putAll(partitions);
+        return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/63e9d246/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index a6b82af..1dd082d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -226,6 +226,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
+        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
+
         // if ZK is specified, prepare the internal source topic before calling partition
grouper
         if (internalTopicManager != null) {
             log.debug("Starting to validate internal source topics in partition assignor.");
@@ -247,15 +249,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     partitions = streamThread.restoreConsumer.partitionsFor(topic);
                 } while (partitions == null || partitions.size() != numPartitions);
 
-                metadata.update(topic, partitions);
+                for (PartitionInfo partition : partitions)
+                    internalPartitionInfos.put(new TopicPartition(partition.topic(), partition.partition()),
partition);
             }
 
             log.info("Completed validating internal source topics in partition assignor.");
         }
         internalSourceTopicToTaskIds.clear();
 
+        Cluster metadataWithInternalTopics = metadata;
+        if (internalTopicManager != null)
+            metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
+
         // get the tasks as partition groups from the partition grouper
-        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups,
metadata);
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
+                sourceTopicGroups, metadataWithInternalTopics);
 
         // add tasks to state change log topic subscribers
         stateChangelogTopicToTaskIds = new HashMap<>();


Mime
View raw message