kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4104: Queryable state metadata is sometimes invalid
Date Fri, 02 Sep 2016 04:21:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 268cff704 -> 8f3462552


KAFKA-4104: Queryable state metadata is sometimes invalid

If the thread or process is not the coordinator the Cluster instance in StreamPartitionAssignor
will always be null. This builds an instance of the Cluster with the metadata associated with
the Assignment

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1804 from dguy/kafka-4104


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f346255
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f346255
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f346255

Branch: refs/heads/trunk
Commit: 8f3462552fa4d6a6d70a837c2ef7439bba512657
Parents: 268cff7
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Sep 1 21:21:42 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Sep 1 21:21:42 2016 -0700

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 18 +++++++++++
 .../internals/StreamPartitionAssignorTest.java  | 32 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index fd70a01..09e192d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -532,6 +532,21 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
         this.partitionToTaskIds = partitionToTaskIds;
         this.partitionsByHostState = info.partitionsByHostState;
+        // only need to build when not coordinator
+        if (metadataWithInternalTopics == null) {
+            final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
+            final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
+            for (Set<TopicPartition> value : values) {
+                for (TopicPartition topicPartition : value) {
+                    topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
+                                                                               topicPartition.partition(),
+                                                                               null,
+                                                                               new Node[0],
+                                                                               new Node[0]));
+                }
+            }
+            metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
+        }
     }
 
     public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
@@ -542,6 +557,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     }
 
     public Cluster clusterMetadata() {
+        if (metadataWithInternalTopics == null) {
+            return Cluster.empty();
+        }
         return metadataWithInternalTopics;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f346255/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 9d261bb..e300966 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class StreamPartitionAssignorTest {
 
@@ -691,6 +692,37 @@ public class StreamPartitionAssignorTest {
         assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
     }
 
+    @Test
+    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+        final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic",
0));
+        final Map<HostInfo, Set<TopicPartition>> hostState =
+                Collections.singletonMap(new HostInfo("localhost", 80),
+                                         Collections.singleton(new TopicPartition("topic",
0)));
+        final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new
TaskId(0, 0)),
+                                                                 Collections.<TaskId,
Set<TopicPartition>>emptyMap(),
+                                                                 hostState);
+
+
+        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
+        final Cluster cluster = partitionAssignor.clusterMetadata();
+        final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic");
+        final PartitionInfo partitionInfo = partitionInfos.get(0);
+        assertEquals(1, partitionInfos.size());
+        assertEquals("topic", partitionInfo.topic());
+        assertEquals(0, partitionInfo.partition());
+    }
+
+    @Test
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        final Cluster cluster = partitionAssignor.clusterMetadata();
+        assertNotNull(cluster);
+
+    }
+
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();


Mime
View raw message