kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
Date Sat, 01 Sep 2018 01:40:52 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f38c8c  KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
4f38c8c is described below

commit 4f38c8cd6079100548ca8056c358a2fea57986c2
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Aug 31 18:40:42 2018 -0700

    KAFKA-7369; Handle retriable errors in AdminClient list groups API (#5595)
    
    We should retry when possible if ListGroups fails due to a retriable error (e.g. coordinator
loading).
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>,  Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  7 ++--
 .../kafka/common/requests/ListGroupsResponse.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 40 +++++++++++++++++++---
 3 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8ddb0c0..904cd06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2567,8 +2567,11 @@ public class KafkaAdminClient extends AdminClient {
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
                             synchronized (results) {
-                                if (response.error() != Errors.NONE) {
-                                    results.addError(response.error().exception(), node);
+                                Errors error = response.error();
+                                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error
== Errors.COORDINATOR_NOT_AVAILABLE) {
+                                    throw error.exception();
+                                } else if (error != Errors.NONE) {
+                                    results.addError(error.exception(), node);
                                 } else {
                                     for (ListGroupsResponse.Group group : response.groups())
{
                                         maybeAddConsumerGroup(group);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index b108803..af6f721 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -62,6 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
+     * COORDINATOR_LOADING_IN_PROGRESS (14)
      * COORDINATOR_NOT_AVAILABLE (15)
      * AUTHORIZATION_FAILED (29)
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0245cbd..c0dc542 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
@@ -46,6 +45,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
@@ -863,9 +863,11 @@ public class KafkaAdminClientTest {
         Node node0 = new Node(0, "localhost", 8121);
         Node node1 = new Node(1, "localhost", 8122);
         Node node2 = new Node(2, "localhost", 8123);
+        Node node3 = new Node(3, "localhost", 8124);
         nodes.put(0, node0);
         nodes.put(1, node1);
         nodes.put(2, node2);
+        nodes.put(3, node3);
 
         final Cluster cluster = new Cluster(
                 "mockClusterId",
@@ -902,13 +904,19 @@ public class KafkaAdminClientTest {
                             )),
                     node0);
 
+            // handle retriable errors
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.COORDINATOR_NOT_AVAILABLE,
                             Collections.emptyList()
                     ),
                     node1);
-
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.COORDINATOR_LOAD_IN_PROGRESS,
+                            Collections.emptyList()
+                    ),
+                    node1);
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
                             Errors.NONE,
@@ -916,15 +924,37 @@ public class KafkaAdminClientTest {
                                     new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
                                     new ListGroupsResponse.Group("group-connect-2", "connector")
                             )),
+                    node1);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            asList(
+                                    new ListGroupsResponse.Group("group-3", ConsumerProtocol.PROTOCOL_TYPE),
+                                    new ListGroupsResponse.Group("group-connect-3", "connector")
+                            )),
                     node2);
 
+            // fatal error
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.UNKNOWN_SERVER_ERROR,
+                            Collections.emptyList()),
+                    node3);
+
+
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
-            TestUtils.assertFutureError(result.all(), CoordinatorNotAvailableException.class);
+            TestUtils.assertFutureError(result.all(), UnknownServerException.class);
+
             Collection<ConsumerGroupListing> listings = result.valid().get();
-            assertEquals(2, listings.size());
+            assertEquals(3, listings.size());
+
+            Set<String> groupIds = new HashSet<>();
             for (ConsumerGroupListing listing : listings) {
-                assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2"));
+                groupIds.add(listing.groupId());
             }
+
+            assertEquals(Utils.mkSet("group-1", "group-2", "group-3"), groupIds);
             assertEquals(1, result.errors().get().size());
         }
     }


Mime
View raw message