kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for each requested topic
Date Fri, 03 Aug 2018 17:41:50 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09fe51f  KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for
each requested topic
09fe51f is described below

commit 09fe51f3eb7e3ddee54cfb210d8a22327d1b0773
Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
AuthorDate: Fri Aug 3 10:38:46 2018 -0700

    KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for each requested
topic
    
    Currently Fetcher.getTopicMetadata() will not include offline partitions. Thus
    KafkaConsumer.partitionsFor(topic) will not return all partitions of a topic if
    there if any partition of the topic is offline. This causes problem if user
    tries to query the total number of partitions of the given topic.
    
    Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
    
    Closes #4679 from radai-rosenblatt/partition_shenanigans
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  2 +-
 .../clients/consumer/internals/FetcherTest.java    | 46 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fd52cb6..dd412ab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -313,7 +313,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                 if (!shouldRetry) {
                     HashMap<String, List<PartitionInfo>> topicsPartitionInfos
= new HashMap<>();
                     for (String topic : cluster.topics())
-                        topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
+                        topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
                     return topicsPartitionInfos;
                 }
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4169550..f97c266 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -1388,6 +1389,51 @@ public class FetcherTest {
         assertTrue(topicMetadata.containsKey(topicName));
     }
 
+    @Test
+    public void testGetTopicMetadataOfflinePartitions() {
+        MetadataResponse originalResponse = newMetadataResponse(topicName, Errors.NONE);
//baseline ok response
+
+        //create a response based on the above one with all partitions being leaderless
+        List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>();
+        for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) {
+            List<MetadataResponse.PartitionMetadata> partitions = item.partitionMetadata();
+            List<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<>();
+            for (MetadataResponse.PartitionMetadata p : partitions) {
+                altPartitions.add(new MetadataResponse.PartitionMetadata(
+                    p.error(),
+                    p.partition(),
+                    null, //no leader
+                    p.replicas(),
+                    p.isr(),
+                    p.offlineReplicas())
+                );
+            }
+            MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(
+                item.error(),
+                item.topic(),
+                item.isInternal(),
+                altPartitions
+            );
+            altTopics.add(alteredTopic);
+        }
+        Node controller = originalResponse.controller();
+        MetadataResponse altered = new MetadataResponse(
+            (List<Node>) originalResponse.brokers(),
+            originalResponse.clusterId(),
+            controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
+            altTopics);
+
+        client.prepareResponse(altered);
+
+        Map<String, List<PartitionInfo>> topicMetadata =
+            fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topicName),
false), 5000L);
+
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertNotNull(topicMetadata.get(topicName));
+        //noinspection ConstantConditions
+        Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
+    }
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */


Mime
View raw message