kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4355: Skip topics that have no partitions
Date Tue, 22 Nov 2016 17:54:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 44f61ba4f -> dcea5f838


KAFKA-4355: Skip topics that have no partitions

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2133 from enothereska/KAFKA-4355-topic-not-found


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dcea5f83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dcea5f83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dcea5f83

Branch: refs/heads/trunk
Commit: dcea5f83888bb346c8ae20887b57904e046463a7
Parents: 44f61ba
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Tue Nov 22 09:54:54 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Nov 22 09:54:54 2016 -0800

----------------------------------------------------------------------
 .../processor/DefaultPartitionGrouper.java      | 19 +++--
 .../internals/StreamPartitionAssignor.java      | 13 +++-
 .../internals/StreamPartitionAssignorTest.java  | 79 +++++++++++++++++---
 3 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dcea5f83/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f0fb38c..1da1209 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +40,7 @@ import java.util.Set;
  */
 public class DefaultPartitionGrouper implements PartitionGrouper {
 
+    private static final Logger log = LoggerFactory.getLogger(DefaultPartitionGrouper.class);
     /**
      * Generate tasks with the assigned topic partitions.
      *
@@ -58,7 +61,8 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
                 Set<TopicPartition> group = new HashSet<>(topicGroup.size());
 
                 for (String topic : topicGroup) {
-                    if (partitionId < metadata.partitionsForTopic(topic).size()) {
+                    List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
+                    if (partitions != null && partitionId < partitions.size())
{
                         group.add(new TopicPartition(topic, partitionId));
                     }
                 }
@@ -77,12 +81,13 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
         for (String topic : topics) {
             List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
 
-            if (partitions == null)
-                throw new StreamsException("Topic not found during partition assignment:
" + topic);
-
-            int numPartitions = partitions.size();
-            if (numPartitions > maxNumPartitions)
-                maxNumPartitions = numPartitions;
+            if (partitions == null) {
+                log.info("Skipping assigning topic {} to tasks since its metadata is not
available yet", topic);
+            } else {
+                int numPartitions = partitions.size();
+                if (numPartitions > maxNumPartitions)
+                    maxNumPartitions = numPartitions;
+            }
         }
         return maxNumPartitions;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dcea5f83/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 009ba1b..b06d7f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -408,11 +408,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             ids.add(id);
         }
         for (String topic : allSourceTopics) {
-            for (PartitionInfo partitionInfo : metadataWithInternalTopics.partitionsForTopic(topic))
{
-                TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
-                if (!allAssignedPartitions.contains(partition)) {
-                    log.warn("stream-thread [{}] Partition {} is not assigned to any tasks:
{}", streamThread.getName(), partition, partitionsForTask);
+            List<PartitionInfo> partitionInfoList = metadataWithInternalTopics.partitionsForTopic(topic);
+            if (partitionInfoList != null) {
+                for (PartitionInfo partitionInfo : partitionInfoList) {
+                    TopicPartition partition = new TopicPartition(partitionInfo.topic(),
partitionInfo.partition());
+                    if (!allAssignedPartitions.contains(partition)) {
+                        log.warn("stream-thread [{}] Partition {} is not assigned to any
tasks: {}", streamThread.getName(), partition, partitionsForTask);
+                    }
                 }
+            } else {
+                log.warn("stream-thread [{}] No partitions found for topic {}", streamThread.getName(),
topic);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dcea5f83/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index cd9b7a5..65cc628 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -194,17 +194,17 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allActiveTasks = new HashSet<>();
 
         // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
         allActiveTasks.addAll(info10.activeTasks);
 
         // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
         allActiveTasks.addAll(info11.activeTasks);
 
         assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
 
         // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
         allActiveTasks.addAll(info20.activeTasks);
 
         assertEquals(3, allActiveTasks.size());
@@ -215,6 +215,67 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void testAssignEmptyMetadata() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final  Cluster emptyMetadata = new Cluster("cluster", Arrays.asList(Node.noNode()),
+            Collections.<PartitionInfo>emptySet(),
+            Collections.<String>emptySet(),
+            Collections.<String>emptySet());
+        UUID uuid1 = UUID.randomUUID();
+        String client1 = "client1";
+
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10,
standbyTasks10, userEndPoint).encode()));
+
+        // initially metadata is empty
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata,
subscriptions);
+
+        // check assigned partitions
+        assertEquals(Collections.<TopicPartition>emptySet(),
+            new HashSet<>(assignments.get("consumer10").partitions()));
+
+        // check assignment info
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+
+        assertEquals(0, allActiveTasks.size());
+        assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
+
+        // then metadata gets populated
+        assignments = partitionAssignor.assign(metadata, subscriptions);
+        // check assigned partitions
+        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)),
+            Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions())));
+
+        // the first consumer
+        info10 = checkAssignment(allTopics, assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, allActiveTasks);
+    }
+
+    @Test
     public void testAssignWithNewTasks() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
@@ -406,12 +467,12 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allStandbyTasks = new HashSet<>();
 
         // the first consumer
-        AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+        AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
         allActiveTasks.addAll(info10.activeTasks);
         allStandbyTasks.addAll(info10.standbyTasks.keySet());
 
         // the second consumer
-        AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+        AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
         allActiveTasks.addAll(info11.activeTasks);
         allStandbyTasks.addAll(info11.standbyTasks.keySet());
 
@@ -422,7 +483,7 @@ public class StreamPartitionAssignorTest {
         assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
 
         // the third consumer
-        AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+        AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
         allActiveTasks.addAll(info20.activeTasks);
         allStandbyTasks.addAll(info20.standbyTasks.keySet());
 
@@ -714,7 +775,7 @@ public class StreamPartitionAssignorTest {
 
     }
 
-    private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+    private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment
assignment) {
 
         // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic
group.
 
@@ -734,7 +795,7 @@ public class StreamPartitionAssignorTest {
         assertEquals(activeTasks, info.activeTasks);
 
         // check if active partitions cover all topics
-        assertEquals(allTopics, activeTopics);
+        assertEquals(expectedTopics, activeTopics);
 
         // check if standby tasks are consistent
         Set<String> standbyTopics = new HashSet<>();
@@ -751,7 +812,7 @@ public class StreamPartitionAssignorTest {
 
         if (info.standbyTasks.size() > 0)
             // check if standby partitions cover all topics
-            assertEquals(allTopics, standbyTopics);
+            assertEquals(expectedTopics, standbyTopics);
 
         return info;
     }


Mime
View raw message