kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
Date Wed, 02 Oct 2019 14:26:42 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new d3bf76b  KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
d3bf76b is described below

commit d3bf76b54c5a7ed6ca9f5b027e6defbaf37d4b53
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri May 17 14:20:04 2019 +0100

    KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
    
    Initiate `unreachable server` scenario before starting admin client to avoid timing issues
if node is disconnected from the test thread while admin client network thread is processing
a metadata request.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/admin/AdminClientUnitTestEnv.java    | 18 ++++++++++++------
 .../kafka/clients/admin/KafkaAdminClientTest.java      |  5 +++--
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 6023c63..42166b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -53,14 +54,18 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
-        this(time, cluster, newStrMap(vals));
+        this(time, cluster, clientConfigs(vals));
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster) {
-        this(time, cluster, newStrMap());
+        this(time, cluster, clientConfigs());
     }
 
     public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config)
{
+        this(time, cluster, config, Collections.emptyMap());
+    }
+
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config,
Map<Node, Long> unreachableNodes) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
@@ -86,6 +91,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         });
 
         metadataManager.update(cluster, time.milliseconds());
+        unreachableNodes.forEach(mockClient::setUnreachable);
         this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager,
mockClient, time);
     }
 
@@ -110,15 +116,15 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
         this.adminClient.close();
     }
 
-    private static Map<String, Object> newStrMap(String... vals) {
+    static Map<String, Object> clientConfigs(String... overrides) {
         Map<String, Object> map = new HashMap<>();
         map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
         map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
-        if (vals.length % 2 != 0) {
+        if (overrides.length % 2 != 0) {
             throw new IllegalStateException();
         }
-        for (int i = 0; i < vals.length; i += 2) {
-            map.put(vals[i], vals[i + 1]);
+        for (int i = 0; i < overrides.length; i += 2) {
+            map.put(overrides[i], overrides[i + 1]);
         }
         return map;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7e9f4c4..388aa88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -259,10 +259,11 @@ public class KafkaAdminClientTest {
         // which prevents AdminClient from being able to send the initial metadata request
 
         Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost",
8121)));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster))
{
+        Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0),
200L);
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+                AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
             Cluster discoveredCluster = mockCluster(0);
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
             env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
                     new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));


Mime
View raw message