kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8435: replace delete groups request/response with automated protocol (#6860)
Date Sat, 20 Jul 2019 00:05:00 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3524ef  KAFKA-8435: replace delete groups request/response with automated protocol
(#6860)
e3524ef is described below

commit e3524ef350830e3e1fa918ec0ae93b09d51fcf37
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Fri Jul 19 17:04:23 2019 -0700

    KAFKA-8435: replace delete groups request/response with automated protocol (#6860)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   6 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/DeleteGroupsRequest.java |  89 +++++---------
 .../common/requests/DeleteGroupsResponse.java      | 129 +++++++--------------
 .../kafka/common/requests/DeleteTopicsRequest.java |  33 ------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  57 ++++++---
 .../common/requests/DeleteGroupsResponseTest.java  |  79 +++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  20 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  20 +++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  10 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   6 +-
 12 files changed, 248 insertions(+), 209 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7cbbd08..eebaf3d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -67,6 +67,7 @@ import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
@@ -2996,7 +2997,10 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new DeleteGroupsRequest.Builder(Collections.singleton(context.getGroupId()));
+                return new DeleteGroupsRequest.Builder(
+                    new DeleteGroupsRequestData()
+                        .setGroupsNames(Collections.singletonList(context.getGroupId()))
+                );
             }
 
             @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 22705a1..171a3cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
@@ -73,8 +75,6 @@ import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteGroupsRequest;
-import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
@@ -189,7 +189,7 @@ public enum ApiKeys {
     RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(),
RenewDelegationTokenResponse.schemaVersions()),
     EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(),
ExpireDelegationTokenResponse.schemaVersions()),
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(),
DescribeDelegationTokenResponse.schemaVersions()),
-    DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
+    DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
     ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
             ElectLeadersResponseData.SCHEMAS),
     INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index b0aafcb..9eddf66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -155,7 +155,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse
{
             case DESCRIBE_DELEGATION_TOKEN:
                 return new DescribeDelegationTokenResponse(struct);
             case DELETE_GROUPS:
-                return new DeleteGroupsResponse(struct);
+                return new DeleteGroupsResponse(struct, version);
             case ELECT_LEADERS:
                 return new ElectLeadersResponse(struct, version);
             case INCREMENTAL_ALTER_CONFIGS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
index f2a5d92..d3061c4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
@@ -16,108 +16,79 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class DeleteGroupsRequest extends AbstractRequest {
-    private static final String GROUPS_KEY_NAME = "groups";
-
-    /* DeleteGroups api */
-    private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema(
-            new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema DELETE_GROUPS_REQUEST_V1 = DELETE_GROUPS_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_GROUPS_REQUEST_V0, DELETE_GROUPS_REQUEST_V1};
-    }
-
-    private final Set<String> groups;
-
     public static class Builder extends AbstractRequest.Builder<DeleteGroupsRequest>
{
-        private final Set<String> groups;
+        private final DeleteGroupsRequestData data;
 
-        public Builder(Set<String> groups) {
+        public Builder(DeleteGroupsRequestData data) {
             super(ApiKeys.DELETE_GROUPS);
-            this.groups = groups;
+            this.data = data;
         }
 
         @Override
         public DeleteGroupsRequest build(short version) {
-            return new DeleteGroupsRequest(groups, version);
+            return new DeleteGroupsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=DeleteGroupsRequest").
-                append(", groups=(").append(Utils.join(groups, ", ")).append(")").
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private DeleteGroupsRequest(Set<String> groups, short version) {
+    public final DeleteGroupsRequestData data;
+
+    public DeleteGroupsRequest(DeleteGroupsRequestData data, short version) {
         super(ApiKeys.DELETE_GROUPS, version);
-        this.groups = groups;
+        this.data = data;
     }
 
     public DeleteGroupsRequest(Struct struct, short version) {
         super(ApiKeys.DELETE_GROUPS, version);
-        Object[] groupsArray = struct.getArray(GROUPS_KEY_NAME);
-        Set<String> groups = new HashSet<>(groupsArray.length);
-        for (Object group : groupsArray)
-            groups.add((String) group);
-
-        this.groups = groups;
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.DELETE_GROUPS.requestSchema(version()));
-        struct.set(GROUPS_KEY_NAME, groups.toArray());
-        return struct;
+        this.data = new DeleteGroupsRequestData(struct, version);
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
-        Map<String, Errors> groupErrors = new HashMap<>(groups.size());
-        for (String group : groups)
-            groupErrors.put(group, error);
 
         switch (version()) {
             case 0:
             case 1:
-                return new DeleteGroupsResponse(throttleTimeMs, groupErrors);
+                DeletableGroupResultCollection groupResults = new DeletableGroupResultCollection();
+                for (String groupId : data.groupsNames()) {
+                    groupResults.add(new DeletableGroupResult()
+                                         .setGroupId(groupId)
+                                         .setErrorCode(error.code()));
+                }
+
+                return new DeleteGroupsResponse(
+                    new DeleteGroupsResponseData()
+                        .setResults(groupResults)
+                        .setThrottleTimeMs(throttleTimeMs)
+                );
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid.
Valid versions for %s are 0 to %d",
                     version(), ApiKeys.DELETE_GROUPS.name, ApiKeys.DELETE_GROUPS.latestVersion()));
         }
     }
 
-    public Set<String> groups() {
-        return groups;
-    }
-
     public static DeleteGroupsRequest parse(ByteBuffer buffer, short version) {
         return new DeleteGroupsRequest(ApiKeys.DELETE_GROUPS.parseRequest(version, buffer),
version);
     }
 
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
index 032f15d..5fe86eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
@@ -16,121 +16,82 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-
+/**
+ * Possible error codes:
+ *
+ * COORDINATOR_LOAD_IN_PROGRESS (14)
+ * COORDINATOR_NOT_AVAILABLE(15)
+ * NOT_COORDINATOR (16)
+ * INVALID_GROUP_ID(24)
+ * GROUP_AUTHORIZATION_FAILED(30)
+ * NON_EMPTY_GROUP(68)
+ * GROUP_ID_NOT_FOUND(69)
+ */
 public class DeleteGroupsResponse extends AbstractResponse {
-    private static final String GROUP_ERROR_CODES_KEY_NAME = "group_error_codes";
-
-    private static final Schema GROUP_ERROR_CODE = new Schema(
-            GROUP_ID,
-            ERROR_CODE);
-
-    private static final Schema DELETE_GROUPS_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array
of per group error codes."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema DELETE_GROUPS_RESPONSE_V1 = DELETE_GROUPS_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_GROUPS_RESPONSE_V0, DELETE_GROUPS_RESPONSE_V1};
-    }
-
 
-    /**
-     * Possible error codes:
-     *
-     * COORDINATOR_LOAD_IN_PROGRESS (14)
-     * COORDINATOR_NOT_AVAILABLE(15)
-     * NOT_COORDINATOR (16)
-     * INVALID_GROUP_ID(24)
-     * GROUP_AUTHORIZATION_FAILED(30)
-     * NON_EMPTY_GROUP(68)
-     * GROUP_ID_NOT_FOUND(69)
-     */
+    public final DeleteGroupsResponseData data;
 
-    private final Map<String, Errors> errors;
-    private final int throttleTimeMs;
-
-    public DeleteGroupsResponse(Map<String, Errors> errors) {
-        this(DEFAULT_THROTTLE_TIME, errors);
-    }
-
-    public DeleteGroupsResponse(int throttleTimeMs, Map<String, Errors> errors) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.errors = errors;
+    public DeleteGroupsResponse(DeleteGroupsResponseData data) {
+        this.data = data;
     }
 
     public DeleteGroupsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        Object[] groupErrorCodesStructs = struct.getArray(GROUP_ERROR_CODES_KEY_NAME);
-        Map<String, Errors> errors = new HashMap<>();
-        for (Object groupErrorCodeStructObj : groupErrorCodesStructs) {
-            Struct groupErrorCodeStruct = (Struct) groupErrorCodeStructObj;
-            String group = groupErrorCodeStruct.get(GROUP_ID);
-            Errors error = Errors.forCode(groupErrorCodeStruct.get(ERROR_CODE));
-            errors.put(group, error);
-        }
-
-        this.errors = errors;
+        short latestVersion = (short) (DeleteGroupsResponseData.SCHEMAS.length - 1);
+        this.data = new DeleteGroupsResponseData(struct, latestVersion);
     }
 
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.DELETE_GROUPS.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        List<Struct> groupErrorCodeStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, Errors> groupError : errors.entrySet()) {
-            Struct groupErrorCodeStruct = struct.instance(GROUP_ERROR_CODES_KEY_NAME);
-            groupErrorCodeStruct.set(GROUP_ID, groupError.getKey());
-            groupErrorCodeStruct.set(ERROR_CODE, groupError.getValue().code());
-            groupErrorCodeStructs.add(groupErrorCodeStruct);
-        }
-        struct.set(GROUP_ERROR_CODES_KEY_NAME, groupErrorCodeStructs.toArray());
-        return struct;
+    public DeleteGroupsResponse(Struct struct, short version) {
+        this.data = new DeleteGroupsResponseData(struct, version);
     }
 
     @Override
-    public int throttleTimeMs() {
-        return throttleTimeMs;
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
     }
 
     public Map<String, Errors> errors() {
-        return errors;
-    }
-
-    public boolean hasError(String group) {
-        return errors.containsKey(group) && errors.get(group) != Errors.NONE;
+        Map<String, Errors> errorMap = new HashMap<>();
+        for (DeletableGroupResult result : data.results()) {
+            errorMap.put(result.groupId(), Errors.forCode(result.errorCode()));
+        }
+        return errorMap;
     }
 
-    public Errors get(String group) {
-        return errors.get(group);
+    public Errors get(String group) throws IllegalArgumentException {
+        DeletableGroupResult result = data.results().find(group);
+        if (result == null) {
+            throw new IllegalArgumentException("could not find group " + group + " in the
delete group response");
+        }
+        return Errors.forCode(result.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(errors);
+        Map<Errors, Integer> counts = new HashMap<>();
+        for (DeletableGroupResult result : data.results()) {
+            Errors error = Errors.forCode(result.errorCode());
+            counts.put(error, counts.getOrDefault(error, 0) + 1);
+        }
+        return counts;
     }
 
     public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) {
-        return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer));
+        return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.parseResponse(version, buffer));
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index 978d1c0..440acfd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -20,44 +20,11 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
 public class DeleteTopicsRequest extends AbstractRequest {
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String TIMEOUT_KEY_NAME = "timeout";
-
-    /* DeleteTopic api */
-    private static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
-            new Field(TOPICS_KEY_NAME, new ArrayOf(STRING), "An array of topics to be deleted."),
-            new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be
completely deleted on the " +
-                    "controller node. Values <= 0 will trigger topic deletion and return
immediately"));
-
-    /* v1 request is the same as v0. Throttle time has been added to the response */
-    private static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out
responses before throttling.
-     */
-    private static final Schema DELETE_TOPICS_REQUEST_V2 = DELETE_TOPICS_REQUEST_V1;
-
-    /**
-     * v3 request is the same that as v2. The response is different based on the request
version.
-     * In v3 version a TopicDeletionDisabledException is returned
-     */
-    private static final Schema DELETE_TOPICS_REQUEST_V3 = DELETE_TOPICS_REQUEST_V2;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1,
-            DELETE_TOPICS_REQUEST_V2, DELETE_TOPICS_REQUEST_V3};
-    }
 
     private DeleteTopicsRequestData data;
     private final short version;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 2b7070e..711c8f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -54,8 +54,11 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
-import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
@@ -1438,9 +1441,14 @@ public class KafkaAdminClientTest {
 
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
 
-            final Map<String, Errors> validResponse = new HashMap<>();
-            validResponse.put("group-0", Errors.NONE);
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
+            final DeletableGroupResultCollection validResponse = new DeletableGroupResultCollection();
+            validResponse.add(new DeletableGroupResult()
+                                  .setGroupId("group-0")
+                                  .setErrorCode(Errors.NONE.code()));
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
+                new DeleteGroupsResponseData()
+                    .setResults(validResponse)
+            ));
 
             final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
 
@@ -1453,28 +1461,45 @@ public class KafkaAdminClientTest {
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
 
-            //Retriable  errors should be retried
+            //Retriable errors should be retried
             env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
env.cluster().controller()));
 
-            final Map<String, Errors> errorResponse1 = new HashMap<>();
-            errorResponse1.put("group-0", Errors.COORDINATOR_NOT_AVAILABLE);
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse1));
-
-            final Map<String, Errors> errorResponse2 = new HashMap<>();
-            errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
+            final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
+            errorResponse1.add(new DeletableGroupResult()
+                                   .setGroupId("group-0")
+                                   .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            );
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
+                new DeleteGroupsResponseData()
+                    .setResults(errorResponse1)));
+
+            final DeletableGroupResultCollection errorResponse2 = new DeletableGroupResultCollection();
+            errorResponse2.add(new DeletableGroupResult()
+                                   .setGroupId("group-0")
+                                   .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+            );
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
+                new DeleteGroupsResponseData()
+                    .setResults(errorResponse2)));
 
             /*
              * We need to return two responses here, one for NOT_COORDINATOR call when calling
delete a consumer group
              * api using coordinator that has moved. This will retry whole operation. So
we need to again respond with a
              * FindCoordinatorResponse.
              */
-            Map<String, Errors> coordinatorMoved = new HashMap<>();
-            coordinatorMoved.put("UnitTestError", Errors.NOT_COORDINATOR);
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(coordinatorMoved));
+            final DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection();
+            coordinatorMoved.add(new DeletableGroupResult()
+                                     .setGroupId("UnitTestError")
+                                     .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
+                new DeleteGroupsResponseData()
+                    .setResults(coordinatorMoved)));
             env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
 
-            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
+            env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
+                new DeleteGroupsResponseData()
+                    .setResults(validResponse)));
 
             final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsResponseTest.java
new file mode 100644
index 0000000..37e8a52
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsResponseTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DeleteGroupsResponseTest {
+
+    private static final String GROUP_ID_1 = "groupId1";
+    private static final String GROUP_ID_2 = "groupId2";
+    private static final int THROTTLE_TIME_MS = 10;
+    private static DeleteGroupsResponse deleteGroupsResponse;
+
+    static {
+        deleteGroupsResponse = new DeleteGroupsResponse(
+            new DeleteGroupsResponseData()
+                .setResults(
+                    new DeletableGroupResultCollection(Arrays.asList(
+                        new DeletableGroupResult()
+                            .setGroupId(GROUP_ID_1)
+                            .setErrorCode(Errors.NONE.code()),
+                        new DeletableGroupResult()
+                            .setGroupId(GROUP_ID_2)
+                            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())).iterator()
+                    )
+                )
+                .setThrottleTimeMs(THROTTLE_TIME_MS));
+    }
+
+    @Test
+    public void testGetErrorWithExistingGroupIds() {
+        assertEquals(Errors.NONE, deleteGroupsResponse.get(GROUP_ID_1));
+        assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, deleteGroupsResponse.get(GROUP_ID_2));
+
+        Map<String, Errors> expectedErrors = new HashMap<>();
+        expectedErrors.put(GROUP_ID_1, Errors.NONE);
+        expectedErrors.put(GROUP_ID_2, Errors.GROUP_AUTHORIZATION_FAILED);
+        assertEquals(expectedErrors, deleteGroupsResponse.errors());
+
+        Map<Errors, Integer> expectedErrorCounts = new HashMap<>();
+        expectedErrorCounts.put(Errors.NONE, 1);
+        expectedErrorCounts.put(Errors.GROUP_AUTHORIZATION_FAILED, 1);
+        assertEquals(expectedErrorCounts, deleteGroupsResponse.errorCounts());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetErrorWithInvalidGroupId() {
+        deleteGroupsResponse.get("invalid-group-id");
+    }
+
+    @Test
+    public void testGetThrottleTimeMs() {
+        assertEquals(THROTTLE_TIME_MS, deleteGroupsResponse.throttleTimeMs());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 75d9720..10da9f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -46,6 +46,10 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicCo
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
@@ -907,13 +911,21 @@ public class RequestResponseTest {
     }
 
     private DeleteGroupsRequest createDeleteGroupsRequest() {
-        return new DeleteGroupsRequest.Builder(Collections.singleton("test-group")).build();
+        return new DeleteGroupsRequest.Builder(
+            new DeleteGroupsRequestData()
+                .setGroupsNames(Collections.singletonList("test-group"))
+        ).build();
     }
 
     private DeleteGroupsResponse createDeleteGroupsResponse() {
-        Map<String, Errors> result = new HashMap<>();
-        result.put("test-group", Errors.NONE);
-        return new DeleteGroupsResponse(result);
+        DeletableGroupResultCollection result = new DeletableGroupResultCollection();
+        result.add(new DeletableGroupResult()
+                       .setGroupId("test-group")
+                       .setErrorCode(Errors.NONE.code()));
+        return new DeleteGroupsResponse(
+            new DeleteGroupsResponseData()
+                .setResults(result)
+        );
     }
 
     @SuppressWarnings("deprecation")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ea8fcb1..cec7470 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -50,6 +50,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANS
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.CreateTopicsResponseData
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
+import org.apache.kafka.common.message.DeleteGroupsResponseData
+import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
 import org.apache.kafka.common.message.DeleteTopicsResponseData
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.DescribeGroupsResponseData
@@ -1480,7 +1482,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
     val deleteGroupsRequest = request.body[DeleteGroupsRequest]
-    val groups = deleteGroupsRequest.groups.asScala.toSet
+    val groups = deleteGroupsRequest.data.groupsNames.asScala.toSet
 
     val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
       authorize(request.session, Delete, Resource(Group, group, LITERAL))
@@ -1489,8 +1491,20 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
       unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs =>
-      new DeleteGroupsResponse(requestThrottleMs, groupDeletionResult.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => {
+      val deletionCollections = new DeletableGroupResultCollection()
+      groupDeletionResult.foreach { case (groupId, error) =>
+        deletionCollections.add(new DeletableGroupResult()
+          .setGroupId(groupId)
+          .setErrorCode(error.code)
+        )
+      }
+
+      new DeleteGroupsResponse(new DeleteGroupsResponseData()
+        .setResults(deletionCollections)
+        .setThrottleTimeMs(requestThrottleMs)
+      )
+    })
   }
 
   def handleHeartbeatRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d7823ac..5b73db0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConf
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -37,6 +37,7 @@ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.DeleteGroupsRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DescribeGroupsRequestData
 import org.apache.kafka.common.message.FindCoordinatorRequestData
@@ -50,7 +51,7 @@ import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, RecordBatch,
SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records,
SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
@@ -401,7 +402,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       .setGroupId(group)
       .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
 
-  private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(Set(group).asJava).build()
+  private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(
+    new DeleteGroupsRequestData()
+      .setGroupsNames(Collections.singletonList(group))
+  ).build()
 
   private def leaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId,
Int.MaxValue, Long.MaxValue,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 0e463a2..7fc3de9 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,9 +27,10 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.ControlledShutdownRequestData
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
-import org.apache.kafka.common.message.CreateDelegationTokenRequestData
+import org.apache.kafka.common.message.DeleteGroupsRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DescribeGroupsRequestData
 import org.apache.kafka.common.message.FindCoordinatorRequestData
@@ -449,7 +450,8 @@ class RequestQuotaTest extends BaseRequestTest {
           new RenewDelegationTokenRequest.Builder("".getBytes, 1000)
 
         case ApiKeys.DELETE_GROUPS =>
-          new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
+          new DeleteGroupsRequest.Builder(new DeleteGroupsRequestData()
+            .setGroupsNames(Collections.singletonList("test-group")))
 
         case ApiKeys.ELECT_LEADERS =>
           new ElectLeadersRequest.Builder(


Mime
View raw message