kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4534: StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics on first assignment
Date Mon, 19 Dec 2016 20:22:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d6b0b520f -> 65acff32d


KAFKA-4534: StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics
on first assignment

partitionsByHostState and metadataWithInternalTopics need to be updated on each call to onAssignment()
otherwise they contain invalid/stale metadata.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2256 from dguy/4534


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65acff32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65acff32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65acff32

Branch: refs/heads/trunk
Commit: 65acff32d195a2ac6e3d7ca4542702065417ba66
Parents: d6b0b52
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Dec 19 12:22:37 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 19 12:22:37 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 30 ++++++-------
 .../internals/StreamPartitionAssignorTest.java  | 44 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65acff32/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 4ae2d33..53607e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -577,26 +577,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             assignedPartitions.add(partition);
         }
 
-        // only need to update the host partitions map if it is not leader
-        if (this.partitionsByHostState == null) {
-            this.partitionsByHostState = info.partitionsByHost;
-        }
-
-        // only need to build if it is not leader
-        if (metadataWithInternalTopics == null) {
-            final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
-            final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
-            for (Set<TopicPartition> value : values) {
-                for (TopicPartition topicPartition : value) {
-                    topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
-                                                                               topicPartition.partition(),
-                                                                               null,
-                                                                               new Node[0],
-                                                                               new Node[0]));
-                }
+        this.partitionsByHostState = info.partitionsByHost;
+
+        final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
+        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
+        for (Set<TopicPartition> value : values) {
+            for (TopicPartition topicPartition : value) {
+                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
+                                                                           topicPartition.partition(),
+                                                                           null,
+                                                                           new Node[0],
+                                                                           new Node[0]));
             }
-            metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
         }
+        metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/65acff32/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 0e0620d..82e9d49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -890,6 +890,50 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        final TopicPartition partitionOne = new TopicPartition("topic", 1);
+        final TopicPartition partitionTwo = new TopicPartition("topic", 2);
+        final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
+                new HostInfo("localhost", 9090), Utils.mkSet(partitionOne, partitionTwo));
+
+        final Map<HostInfo, Set<TopicPartition>> secondHostState = new HashMap<>();
+        secondHostState.put(new HostInfo("localhost", 9090), Utils.mkSet(partitionOne));
+        secondHostState.put(new HostInfo("other", 9090), Utils.mkSet(partitionTwo));
+
+        partitionAssignor.onAssignment(createAssignment(firstHostState));
+        assertEquals(firstHostState, partitionAssignor.getPartitionsByHostState());
+        partitionAssignor.onAssignment(createAssignment(secondHostState));
+        assertEquals(secondHostState, partitionAssignor.getPartitionsByHostState());
+    }
+
+    @Test
+    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        final TopicPartition topicOne = new TopicPartition("topic", 1);
+        final TopicPartition topicTwo = new TopicPartition("topic2", 2);
+        final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
+                new HostInfo("localhost", 9090), Utils.mkSet(topicOne));
+
+        final Map<HostInfo, Set<TopicPartition>> secondHostState = Collections.singletonMap(
+                new HostInfo("localhost", 9090), Utils.mkSet(topicOne, topicTwo));
+
+        partitionAssignor.onAssignment(createAssignment(firstHostState));
+        assertEquals(Utils.mkSet("topic"), partitionAssignor.clusterMetadata().topics());
+        partitionAssignor.onAssignment(createAssignment(secondHostState));
+        assertEquals(Utils.mkSet("topic", "topic2"), partitionAssignor.clusterMetadata().topics());
+    }
+
+    private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>>
firstHostState) {
+        final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
+                                                       Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+                                                       firstHostState);
+
+        return new PartitionAssignor.Assignment(
+                Collections.<TopicPartition>emptyList(), info.encode());
+    }
+
+    @Test
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
         final Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");


Mime
View raw message