kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: KAFKA-4060 and KAFKA-4476 follow up
Date Mon, 23 Jan 2017 17:35:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 f0449d324 -> afde87804


HOTFIX: KAFKA-4060 and KAFKA-4476 follow up

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2418 from mjsax/kafka-4060-zk-test-follow-up

(cherry picked from commit dd897bdb2e9cc8790d1e0494fb7867a5cd09ecc6)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/afde8780
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/afde8780
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/afde8780

Branch: refs/heads/0.10.2
Commit: afde87804211dfea2b9ee87c5f6470e8460dcd33
Parents: f0449d3
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Jan 23 09:34:52 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 23 09:35:00 2017 -0800

----------------------------------------------------------------------
 .../processor/DefaultPartitionGrouper.java      |  2 +-
 .../processor/DefaultPartitionGrouperTest.java  | 72 +++++++++++++-------
 2 files changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/afde8780/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 2f354d8..25efcd6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -84,7 +84,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
 
             if (partitions == null) {
                 log.info("Skipping assigning topic {} to tasks since its metadata is not
available yet", topic);
-                maxNumPartitions = StreamPartitionAssignor.NOT_AVAILABLE;
+                return StreamPartitionAssignor.NOT_AVAILABLE;
             } else {
                 int numPartitions = partitions.size();
                 if (numPartitions > maxNumPartitions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/afde8780/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index e36bde4..e26453d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -32,11 +30,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.Assert.assertEquals;
 
 public class DefaultPartitionGrouperTest {
 
-    private List<PartitionInfo> infos = Arrays.asList(
+    private final List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
@@ -44,39 +43,64 @@ public class DefaultPartitionGrouperTest {
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
infos, Collections.<String>emptySet(),
+    private final Cluster metadata = new Cluster(
+        "cluster",
+        Collections.singletonList(Node.noNode()),
+        infos,
+        Collections.<String>emptySet(),
         Collections.<String>emptySet());
 
     @Test
-    public void testGrouping() {
-        PartitionGrouper grouper = new DefaultPartitionGrouper();
-        int topicGroupId;
-        Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask;
-        Map<Integer, Set<String>> topicGroups;
-
-        topicGroups = new HashMap<>();
-        topicGroups.put(0, mkSet("topic1"));
-        topicGroups.put(1, mkSet("topic2"));
-
-        expectedPartitionsForTask = new HashMap<>();
-        topicGroupId = 0;
+    public void shouldComputeGroupingForTwoGroups() {
+        final PartitionGrouper grouper = new DefaultPartitionGrouper();
+        final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new
HashMap<>();
+        final Map<Integer, Set<String>> topicGroups = new HashMap<>();
+
+        int topicGroupId = 0;
+
+        topicGroups.put(topicGroupId, mkSet("topic1"));
         expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1",
0)));
         expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1",
1)));
         expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1",
2)));
-        topicGroupId++;
+
+        topicGroups.put(++topicGroupId, mkSet("topic2"));
         expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2",
0)));
         expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2",
1)));
 
         assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
+    }
 
-        topicGroups = new HashMap<>();
-        topicGroups.put(0, mkSet("topic1", "topic2"));
+    @Test
+    public void shouldComputeGroupingForSingleGroupWithMultipleTopics() {
+        final PartitionGrouper grouper = new DefaultPartitionGrouper();
+        final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new
HashMap<>();
+        final Map<Integer, Set<String>> topicGroups = new HashMap<>();
 
-        expectedPartitionsForTask = new HashMap<>();
-        topicGroupId = 0;
-        expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1",
0), new TopicPartition("topic2", 0)));
-        expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1",
1), new TopicPartition("topic2", 1)));
-        expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1",
2)));
+        final int topicGroupId = 0;
+
+        topicGroups.put(topicGroupId, mkSet("topic1", "topic2"));
+        expectedPartitionsForTask.put(
+            new TaskId(topicGroupId, 0),
+            mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
+        expectedPartitionsForTask.put(
+            new TaskId(topicGroupId, 1),
+            mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+        expectedPartitionsForTask.put(
+            new TaskId(topicGroupId, 2),
+            mkSet(new TopicPartition("topic1", 2)));
+
+        assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
+    }
+
+    @Test
+    public void shouldNotCreateAnyTasksBecauseOneTopicHasUnknownPartitions() {
+        final PartitionGrouper grouper = new DefaultPartitionGrouper();
+        final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new
HashMap<>();
+        final Map<Integer, Set<String>> topicGroups = new HashMap<>();
+
+        final int topicGroupId = 0;
+
+        topicGroups.put(topicGroupId, mkSet("topic1", "unknownTopic", "topic2"));
 
         assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
     }


Mime
View raw message