kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8424: replace ListGroups request/response with automated protocol (#6805)
Date Wed, 10 Jul 2019 21:48:59 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f5a5eb  KAFKA-8424: replace ListGroups request/response with automated protocol
(#6805)
4f5a5eb is described below

commit 4f5a5eb579a20cf86eaf5018663d44e5592d0401
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Wed Jul 10 14:48:38 2019 -0700

    KAFKA-8424: replace ListGroups request/response with automated protocol (#6805)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  12 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/ListGroupsRequest.java   |  55 +++++-----
 .../kafka/common/requests/ListGroupsResponse.java  | 116 +++------------------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  62 +++++++----
 .../kafka/common/requests/RequestResponseTest.java |  14 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  19 +++-
 .../KafkaMetricReporterExceptionHandlingTest.scala |  15 +--
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 10 files changed, 137 insertions(+), 172 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f7c022e..7cbbd08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -73,11 +73,13 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
-import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -2848,10 +2850,10 @@ public class KafkaAdminClient extends AdminClient {
                     runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id()))
{
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new ListGroupsRequest.Builder();
+                            return new ListGroupsRequest.Builder(new ListGroupsRequestData());
                         }
 
-                        private void maybeAddConsumerGroup(ListGroupsResponse.Group group)
{
+                        private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup
group) {
                             String protocolType = group.protocolType();
                             if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty())
{
                                 final String groupId = group.groupId();
@@ -2864,13 +2866,13 @@ public class KafkaAdminClient extends AdminClient {
                         void handleResponse(AbstractResponse abstractResponse) {
                             final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
                             synchronized (results) {
-                                Errors error = response.error();
+                                Errors error = Errors.forCode(response.data().errorCode());
                                 if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error
== Errors.COORDINATOR_NOT_AVAILABLE) {
                                     throw error.exception();
                                 } else if (error != Errors.NONE) {
                                     results.addError(error.exception(), node);
                                 } else {
-                                    for (ListGroupsResponse.Group group : response.groups())
{
+                                    for (ListGroupsResponseData.ListedGroup group : response.data().groups())
{
                                         maybeAddConsumerGroup(group);
                                     }
                                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 257393b..22705a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -40,6 +40,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
@@ -91,8 +93,6 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.requests.LeaderAndIsrResponse;
-import org.apache.kafka.common.requests.ListGroupsRequest;
-import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -143,7 +143,7 @@ public enum ApiKeys {
     SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
     DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
             DescribeGroupsResponseData.SCHEMAS),
-    LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
+    LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS),
     SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
     API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions())
{
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8a7be33..b0aafcb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -103,7 +103,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse
{
             case DESCRIBE_GROUPS:
                 return new DescribeGroupsResponse(struct, version);
             case LIST_GROUPS:
-                return new ListGroupsResponse(struct);
+                return new ListGroupsResponse(struct, version);
             case SASL_HANDSHAKE:
                 return new SaslHandshakeResponse(struct, version);
             case API_VERSIONS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index baab1e1..b0e8860 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -16,54 +16,54 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
+/**
+ * Possible error codes:
+ *
+ * COORDINATOR_LOAD_IN_PROGRESS (14)
+ * COORDINATOR_NOT_AVAILABLE (15)
+ * AUTHORIZATION_FAILED (29)
+ */
 public class ListGroupsRequest extends AbstractRequest {
 
-    /* List groups api */
-    private static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    private static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0;
+    public static class Builder extends AbstractRequest.Builder<ListGroupsRequest>
{
 
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema LIST_GROUPS_REQUEST_V2 = LIST_GROUPS_REQUEST_V1;
+        private final ListGroupsRequestData data;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1,
-            LIST_GROUPS_REQUEST_V2};
-    }
-
-    public static class Builder extends AbstractRequest.Builder<ListGroupsRequest>
{
-        public Builder() {
+        public Builder(ListGroupsRequestData data) {
             super(ApiKeys.LIST_GROUPS);
+            this.data = data;
         }
 
         @Override
         public ListGroupsRequest build(short version) {
-            return new ListGroupsRequest(version);
+            return new ListGroupsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            return "(type=ListGroupsRequest)";
+            return data.toString();
         }
     }
 
-    public ListGroupsRequest(short version) {
+    private final ListGroupsRequestData data;
+
+    public ListGroupsRequest(ListGroupsRequestData data, short version) {
         super(ApiKeys.LIST_GROUPS, version);
+        this.data = data;
     }
 
-    public ListGroupsRequest(Struct struct, short versionId) {
-        super(ApiKeys.LIST_GROUPS, versionId);
+    public ListGroupsRequest(Struct struct, short version) {
+        super(ApiKeys.LIST_GROUPS, version);
+        this.data = new ListGroupsRequestData(struct, version);
     }
 
     @Override
@@ -71,10 +71,17 @@ public class ListGroupsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new ListGroupsResponse(Errors.forException(e), Collections.emptyList());
+                return new ListGroupsResponse(new ListGroupsResponseData()
+                        .setGroups(Collections.emptyList())
+                        .setErrorCode(Errors.forException(e).code())
+                );
             case 1:
             case 2:
-                return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.emptyList());
+                return new ListGroupsResponse(new ListGroupsResponseData()
+                        .setGroups(Collections.emptyList())
+                        .setErrorCode(Errors.forException(e).code())
+                        .setThrottleTimeMs(throttleTimeMs)
+                );
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid.
Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index de16998..f3a67ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -16,138 +16,48 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
 public class ListGroupsResponse extends AbstractResponse {
 
-    private static final String GROUPS_KEY_NAME = "groups";
-    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
-
-    private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(
-            GROUP_ID,
-            new Field(PROTOCOL_TYPE_KEY_NAME, STRING));
-    private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
-    private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema LIST_GROUPS_RESPONSE_V2 = LIST_GROUPS_RESPONSE_V1;
+    private final ListGroupsResponseData data;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1,
-            LIST_GROUPS_RESPONSE_V2};
+    public ListGroupsResponse(ListGroupsResponseData data) {
+        this.data = data;
     }
 
-    /**
-     * Possible error codes:
-     *
-     * COORDINATOR_LOAD_IN_PROGRESS (14)
-     * COORDINATOR_NOT_AVAILABLE (15)
-     * AUTHORIZATION_FAILED (29)
-     */
-
-    private final Errors error;
-    private final int throttleTimeMs;
-    private final List<Group> groups;
-
-    public ListGroupsResponse(Errors error, List<Group> groups) {
-        this(DEFAULT_THROTTLE_TIME, error, groups);
+    public ListGroupsResponse(Struct struct, short version) {
+        this.data = new ListGroupsResponseData(struct, version);
     }
 
-    public ListGroupsResponse(int throttleTimeMs, Errors error, List<Group> groups)
{
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.groups = groups;
-    }
-
-    public ListGroupsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.error = Errors.forCode(struct.get(ERROR_CODE));
-        this.groups = new ArrayList<>();
-        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
-            Struct groupStruct = (Struct) groupObj;
-            String groupId = groupStruct.get(GROUP_ID);
-            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
-            this.groups.add(new Group(groupId, protocolType));
-        }
+    public ListGroupsResponseData data() {
+        return data;
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public List<Group> groups() {
-        return groups;
-    }
-
-    public Errors error() {
-        return error;
+        return data.throttleTimeMs();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public static class Group {
-        private final String groupId;
-        private final String protocolType;
-
-        public Group(String groupId, String protocolType) {
-            this.groupId = groupId;
-            this.protocolType = protocolType;
-        }
-
-        public String groupId() {
-            return groupId;
-        }
-
-        public String protocolType() {
-            return protocolType;
-        }
-
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.LIST_GROUPS.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(ERROR_CODE, error.code());
-        List<Struct> groupList = new ArrayList<>();
-        for (Group group : groups) {
-            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
-            groupStruct.set(GROUP_ID, group.groupId);
-            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
-            groupList.add(groupStruct);
-        }
-        struct.set(GROUPS_KEY_NAME, groupList.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     public static ListGroupsResponse parse(ByteBuffer buffer, short version) {
-        return new ListGroupsResponse(ApiKeys.LIST_GROUPS.parseResponse(version, buffer));
+        return new ListGroupsResponse(ApiKeys.LIST_GROUPS.responseSchema(version).read(buffer),
version);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 693538d..10722e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
@@ -1030,52 +1031,69 @@ public class KafkaAdminClientTest {
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.NONE,
-                            asList(
-                                    new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE),
-                                    new ListGroupsResponse.Group("group-connect-1", "connector")
-                            )),
+                            new ListGroupsResponseData()
+                            .setErrorCode(Errors.NONE.code())
+                            .setGroups(Arrays.asList(
+                                    new ListGroupsResponseData.ListedGroup()
+                                            .setGroupId("group-1")
+                                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                    new ListGroupsResponseData.ListedGroup()
+                                            .setGroupId("group-connect-1")
+                                            .setProtocolType("connector")
+                            ))),
                     node0);
 
             // handle retriable errors
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                        Errors.COORDINATOR_NOT_AVAILABLE,
-                        Collections.emptyList()
+                            new ListGroupsResponseData()
+                                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                                    .setGroups(Collections.emptyList())
                     ),
                     node1);
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.COORDINATOR_LOAD_IN_PROGRESS,
-                            Collections.emptyList()
+                            new ListGroupsResponseData()
+                                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                                    .setGroups(Collections.emptyList())
                     ),
                     node1);
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.NONE,
-                            asList(
-                                    new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE),
-                                    new ListGroupsResponse.Group("group-connect-2", "connector")
-                            )),
+                            new ListGroupsResponseData()
+                                    .setErrorCode(Errors.NONE.code())
+                                    .setGroups(Arrays.asList(
+                                            new ListGroupsResponseData.ListedGroup()
+                                                    .setGroupId("group-2")
+                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                            new ListGroupsResponseData.ListedGroup()
+                                                    .setGroupId("group-connect-2")
+                                                    .setProtocolType("connector")
+                            ))),
                     node1);
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.NONE,
-                            asList(
-                                    new ListGroupsResponse.Group("group-3", ConsumerProtocol.PROTOCOL_TYPE),
-                                    new ListGroupsResponse.Group("group-connect-3", "connector")
-                            )),
+                            new ListGroupsResponseData()
+                                    .setErrorCode(Errors.NONE.code())
+                                    .setGroups(Arrays.asList(
+                                            new ListGroupsResponseData.ListedGroup()
+                                                    .setGroupId("group-3")
+                                                    .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
+                                            new ListGroupsResponseData.ListedGroup()
+                                                    .setGroupId("group-connect-3")
+                                                    .setProtocolType("connector")
+                                    ))),
                     node2);
 
             // fatal error
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
-                            Errors.UNKNOWN_SERVER_ERROR,
-                            Collections.emptyList()),
+                            new ListGroupsResponseData()
+                                    .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                                    .setGroups(Collections.emptyList())),
                     node3);
 
-
             final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
             TestUtils.assertFutureError(result.all(), UnknownServerException.class);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5fd1518..11c19b9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -67,6 +67,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@@ -874,12 +876,18 @@ public class RequestResponseTest {
     }
 
     private ListGroupsRequest createListGroupsRequest() {
-        return new ListGroupsRequest.Builder().build();
+        return new ListGroupsRequest.Builder(new ListGroupsRequestData()).build();
     }
 
     private ListGroupsResponse createListGroupsResponse() {
-        List<ListGroupsResponse.Group> groups = Collections.singletonList(new ListGroupsResponse.Group("test-group",
"consumer"));
-        return new ListGroupsResponse(Errors.NONE, groups);
+        return new ListGroupsResponse(
+                new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Collections.singletonList(
+                                new ListGroupsResponseData.ListedGroup()
+                                        .setGroupId("test-group")
+                                        .setProtocolType("consumer")
+                )));
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 698e5a9..ea8fcb1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,6 +60,7 @@ import org.apache.kafka.common.message.HeartbeatResponseData
 import org.apache.kafka.common.message.InitProducerIdResponseData
 import org.apache.kafka.common.message.JoinGroupResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.ListGroupsResponseData
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.OffsetCommitResponseData
 import org.apache.kafka.common.message.SaslAuthenticateResponseData
@@ -1344,11 +1345,25 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorize(request.session, Describe, Resource.ClusterResource))
       // With describe cluster access all groups are returned. We keep this alternative for
backward compatibility.
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(requestThrottleMs, error, groups.map { group => new ListGroupsResponse.Group(group.groupId,
group.protocolType) }.asJava))
+        new ListGroupsResponse(new ListGroupsResponseData()
+            .setErrorCode(error.code())
+            .setGroups(groups.map { group => new ListGroupsResponseData.ListedGroup()
+              .setGroupId(group.groupId)
+              .setProtocolType(group.protocolType)}.asJava
+            )
+            .setThrottleTimeMs(requestThrottleMs)
+        ))
     else {
       val filteredGroups = groups.filter(group => authorize(request.session, Describe,
new Resource(Group, group.groupId, LITERAL)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new ListGroupsResponse(requestThrottleMs, error, filteredGroups.map { group =>
new ListGroupsResponse.Group(group.groupId, group.protocolType) }.asJava))
+        new ListGroupsResponse(new ListGroupsResponseData()
+          .setErrorCode(error.code())
+          .setGroups(filteredGroups.map { group => new ListGroupsResponseData.ListedGroup()
+            .setGroupId(group.groupId)
+            .setProtocolType(group.protocolType)}.asJava
+          )
+          .setThrottleTimeMs(requestThrottleMs)
+        ))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index f3580cf..00b3ebc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -15,21 +15,22 @@
 package kafka.server
 
 import java.net.Socket
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.utils.TestUtils
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.requests.{ListGroupsRequest,ListGroupsResponse}
+import org.apache.kafka.common.requests.{ListGroupsRequest, ListGroupsResponse}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.metrics.KafkaMetric
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.protocol.Errors
-
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.junit.After
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.kafka.common.message.ListGroupsRequestData
+
 /*
  * this test checks that a reporter that throws an exception will not affect other reporters
  * and will not affect the broker's message handling
@@ -68,8 +69,10 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest
{
 
     try {
       TestUtils.retry(10000) {
-        val error = new ListGroupsResponse(requestResponse(socket, "clientId", 0, new ListGroupsRequest.Builder())).error()
-        assertEquals(Errors.NONE, error)
+        val error = new ListGroupsResponse(
+          requestResponse(socket, "clientId", 0, new ListGroupsRequest.Builder(new ListGroupsRequestData)),
ApiKeys.LIST_GROUPS.latestVersion)
+          .errorCounts()
+        assertEquals(Collections.singletonMap(Errors.NONE, 1), error)
         assertEquals(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get,
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.get)
         assertTrue(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get >
0)
       }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 493b3e3..0e463a2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.ListGroupsRequestData
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.SaslAuthenticateRequestData
 import org.apache.kafka.common.message.SaslHandshakeRequestData
@@ -341,7 +342,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))
 
         case ApiKeys.LIST_GROUPS =>
-          new ListGroupsRequest.Builder()
+          new ListGroupsRequest.Builder(new ListGroupsRequestData())
 
         case ApiKeys.SASL_HANDSHAKE =>
           new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism("PLAIN"))
@@ -535,7 +536,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_GROUPS =>
         new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
-      case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
+      case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response, ApiKeys.LIST_GROUPS.latestVersion).throttleTimeMs
       case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
       case ApiKeys.CREATE_TOPICS =>
         new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion).throttleTimeMs
@@ -574,8 +575,9 @@ class RequestQuotaTest extends BaseRequestTest {
     val clientId = apiKey.toString
     val client = Client(clientId, apiKey)
 
-    val throttled = client.runUntil(response =>
+    val throttled = client.runUntil(response => {
       responseThrottleTime(apiKey, response) > 0
+    }
     )
 
     assertTrue(s"Response not throttled: $client", throttled)


Mime
View raw message