kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (#8853)
Date Wed, 17 Jun 2020 14:56:48 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26e238c  KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>)
is unable to handle broker resource (#8853)
26e238c is described below

commit 26e238c6f5a242459f52bfefa97a6b0c247b2d5e
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Wed Jun 17 22:56:07 2020 +0800

    KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable
to handle broker resource (#8853)
    
    Author: Chia-Ping Tsai <chia7712@gmail.com>
    Reviewers: Boyang Chen <boyang@confluent.io>, Randall Hauch <rhauch@gmail.com>
---
 .../kafka/clients/admin/MockAdminClient.java       | 72 +++++++---------------
 1 file changed, 23 insertions(+), 49 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2b86d4f..7c6a955 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class MockAdminClient extends AdminClient {
     public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@@ -366,51 +367,6 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources)
{
-        Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new HashMap<>();
-
-        if (timeoutNextRequests > 0) {
-            for (ConfigResource requestedResource : resources) {
-                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new TimeoutException());
-                topicConfigs.put(requestedResource, future);
-            }
-
-            --timeoutNextRequests;
-            return new DescribeConfigsResult(topicConfigs);
-        }
-
-        for (ConfigResource requestedResource : resources) {
-            if (requestedResource.type() != ConfigResource.Type.TOPIC) {
-                continue;
-            }
-            for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet())
{
-                String topicName = topicDescription.getKey();
-                if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion)
{
-                    if (topicDescription.getValue().fetchesRemainingUntilVisible > 0)
{
-                        topicDescription.getValue().fetchesRemainingUntilVisible--;
-                    } else {
-                        TopicMetadata topicMetadata = topicDescription.getValue();
-                        KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
-                        Collection<ConfigEntry> entries = new ArrayList<>();
-                        topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k,
v)));
-                        future.complete(new Config(entries));
-                        topicConfigs.put(requestedResource, future);
-                        break;
-                    }
-                }
-            }
-            if (!topicConfigs.containsKey(requestedResource)) {
-                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new UnknownTopicOrPartitionException("Resource
" + requestedResource + " not found."));
-                topicConfigs.put(requestedResource, future);
-            }
-        }
-
-        return new DescribeConfigsResult(topicConfigs);
-    }
-
-    @Override
     synchronized public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete,
DeleteTopicsOptions options) {
         Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
 
@@ -535,6 +491,19 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     synchronized public DescribeConfigsResult describeConfigs(Collection<ConfigResource>
resources, DescribeConfigsOptions options) {
+
+        if (timeoutNextRequests > 0) {
+            Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<>();
+            for (ConfigResource requestedResource : resources) {
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                configs.put(requestedResource, future);
+            }
+
+            --timeoutNextRequests;
+            return new DescribeConfigsResult(configs);
+        }
+
         Map<ConfigResource, KafkaFuture<Config>> results = new HashMap<>();
         for (ConfigResource resource : resources) {
             KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
@@ -551,7 +520,7 @@ public class MockAdminClient extends AdminClient {
     synchronized private Config getResourceDescription(ConfigResource resource) {
         switch (resource.type()) {
             case BROKER: {
-                int brokerId = Integer.valueOf(resource.name());
+                int brokerId = Integer.parseInt(resource.name());
                 if (brokerId >= brokerConfigs.size()) {
                     throw new InvalidRequestException("Broker " + resource.name() +
                         " not found.");
@@ -560,10 +529,15 @@ public class MockAdminClient extends AdminClient {
             }
             case TOPIC: {
                 TopicMetadata topicMetadata = allTopics.get(resource.name());
-                if (topicMetadata == null) {
-                    throw new UnknownTopicOrPartitionException();
+                if (topicMetadata != null && !topicMetadata.markedForDeletion) {
+                    if (topicMetadata.fetchesRemainingUntilVisible > 0)
+                        topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible
- 1);
+                    else return new Config(topicMetadata.configs.entrySet().stream()
+                                .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue()))
+                                .collect(Collectors.toList()));
+
                 }
-                return toConfigObject(topicMetadata.configs);
+                throw new UnknownTopicOrPartitionException("Resource " + resource + " not
found.");
             }
             default:
                 throw new UnsupportedOperationException("Not implemented yet");


Mime
View raw message