kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9027, KAFKA-9028: Convert create/delete acls requests/response to use generated protocol (#7725)
Date Mon, 03 Feb 2020 15:13:43 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new b1fc625  KAFKA-9027, KAFKA-9028: Convert create/delete acls requests/response to use generated protocol (#7725)
b1fc625 is described below

commit b1fc625182586785b82ea511e41e1e9f91769b0b
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Mon Feb 3 07:12:00 2020 -0800

    KAFKA-9027, KAFKA-9028: Convert create/delete acls requests/response to use generated protocol (#7725)
    
    Also add support for flexible versions to both protocol types.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin Patrick McCabe <cmccabe@apache.org>
    
    Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
    Co-authored-by: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  71 +++---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  12 +-
 .../apache/kafka/common/protocol/CommonFields.java |  27 ---
 .../kafka/common/requests/AbstractResponse.java    |   4 +-
 .../kafka/common/requests/CreateAclsRequest.java   | 188 ++++++---------
 .../kafka/common/requests/CreateAclsResponse.java  |  88 +------
 .../kafka/common/requests/DeleteAclsRequest.java   | 169 ++++++-------
 .../kafka/common/requests/DeleteAclsResponse.java  | 266 ++++++---------------
 .../kafka/common/requests/DescribeAclsRequest.java |  43 ++--
 .../common/requests/DescribeAclsResponse.java      |  43 ++--
 .../apache/kafka/common/requests/RequestUtils.java |  80 -------
 .../common/message/CreateAclsRequest.json          |   7 +-
 .../common/message/CreateAclsResponse.json         |   7 +-
 .../common/message/DeleteAclsRequest.json          |   5 +-
 .../common/message/DeleteAclsResponse.json         |   5 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  72 +++---
 .../kafka/common/protocol/MessageTestUtil.java     |  15 --
 .../common/requests/CreateAclsRequestTest.java     |  23 +-
 .../common/requests/DeleteAclsRequestTest.java     |  35 +--
 .../common/requests/DeleteAclsResponseTest.java    | 133 ++++++-----
 .../common/requests/DescribeAclsRequestTest.java   |  11 +-
 .../common/requests/DescribeAclsResponseTest.java  |   8 +-
 .../kafka/common/requests/RequestResponseTest.java |  90 ++++---
 .../main/scala/kafka/server/DelayedFuture.scala    |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  93 +++----
 .../kafka/api/AuthorizerIntegrationTest.scala      |  43 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  29 ++-
 27 files changed, 652 insertions(+), 919 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 800fab3..6b8e962 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,9 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
+import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
@@ -80,6 +83,11 @@ import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -126,9 +134,7 @@ import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
@@ -137,8 +143,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -1765,44 +1769,48 @@ public class KafkaAdminClient extends AdminClient {
         final long now = time.milliseconds();
         final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>();
         final List<AclCreation> aclCreations = new ArrayList<>();
+        final List<AclBinding> aclBindingsSent = new ArrayList<>();
         for (AclBinding acl : acls) {
             if (futures.get(acl) == null) {
                 KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
                 futures.put(acl, future);
                 String indefinite = acl.toFilter().findIndefiniteField();
                 if (indefinite == null) {
-                    aclCreations.add(new AclCreation(acl));
+                    aclCreations.add(CreateAclsRequest.aclCreation(acl));
+                    aclBindingsSent.add(acl);
                 } else {
                     future.completeExceptionally(new InvalidRequestException("Invalid ACL creation: " +
                         indefinite));
                 }
             }
         }
+        final CreateAclsRequestData data = new CreateAclsRequestData().setCreations(aclCreations);
         runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
 
             @Override
             CreateAclsRequest.Builder createRequest(int timeoutMs) {
-                return new CreateAclsRequest.Builder(aclCreations);
+                return new CreateAclsRequest.Builder(data);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 CreateAclsResponse response = (CreateAclsResponse) abstractResponse;
-                List<AclCreationResponse> responses = response.aclCreationResponses();
-                Iterator<AclCreationResponse> iter = responses.iterator();
-                for (AclCreation aclCreation : aclCreations) {
-                    KafkaFutureImpl<Void> future = futures.get(aclCreation.acl());
+                List<AclCreationResult> responses = response.results();
+                Iterator<AclCreationResult> iter = responses.iterator();
+                for (AclBinding aclBinding : aclBindingsSent) {
+                    KafkaFutureImpl<Void> future = futures.get(aclBinding);
                     if (!iter.hasNext()) {
                         future.completeExceptionally(new UnknownServerException(
-                            "The broker reported no creation result for the given ACL."));
+                            "The broker reported no creation result for the given ACL: " + aclBinding));
                     } else {
-                        AclCreationResponse creation = iter.next();
-                        if (creation.error().isFailure()) {
-                            future.completeExceptionally(creation.error().exception());
-                        } else {
+                        AclCreationResult creation = iter.next();
+                        Errors error = Errors.forCode(creation.errorCode());
+                        ApiError apiError = new ApiError(error, creation.errorMessage());
+                        if (apiError.isFailure())
+                            future.completeExceptionally(apiError.exception());
+                        else
                             future.complete(null);
-                        }
                     }
                 }
             }
@@ -1819,39 +1827,46 @@ public class KafkaAdminClient extends AdminClient {
     public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
         final long now = time.milliseconds();
         final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = new HashMap<>();
-        final List<AclBindingFilter> filterList = new ArrayList<>();
+        final List<AclBindingFilter> aclBindingFiltersSent = new ArrayList<>();
+        final List<DeleteAclsFilter> deleteAclsFilters = new ArrayList<>();
         for (AclBindingFilter filter : filters) {
             if (futures.get(filter) == null) {
-                filterList.add(filter);
+                aclBindingFiltersSent.add(filter);
+                deleteAclsFilters.add(DeleteAclsRequest.deleteAclsFilter(filter));
                 futures.put(filter, new KafkaFutureImpl<>());
             }
         }
+        final DeleteAclsRequestData data = new DeleteAclsRequestData().setFilters(deleteAclsFilters);
         runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
 
             @Override
             DeleteAclsRequest.Builder createRequest(int timeoutMs) {
-                return new DeleteAclsRequest.Builder(filterList);
+                return new DeleteAclsRequest.Builder(data);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DeleteAclsResponse response = (DeleteAclsResponse) abstractResponse;
-                List<AclFilterResponse> responses = response.responses();
-                Iterator<AclFilterResponse> iter = responses.iterator();
-                for (AclBindingFilter filter : filterList) {
-                    KafkaFutureImpl<FilterResults> future = futures.get(filter);
+                List<DeleteAclsResponseData.DeleteAclsFilterResult> results = response.filterResults();
+                Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter = results.iterator();
+                for (AclBindingFilter bindingFilter : aclBindingFiltersSent) {
+                    KafkaFutureImpl<FilterResults> future = futures.get(bindingFilter);
                     if (!iter.hasNext()) {
                         future.completeExceptionally(new UnknownServerException(
                             "The broker reported no deletion result for the given filter."));
                     } else {
-                        AclFilterResponse deletion = iter.next();
-                        if (deletion.error().isFailure()) {
-                            future.completeExceptionally(deletion.error().exception());
+                        DeleteAclsFilterResult filterResult = iter.next();
+                        ApiError error = new ApiError(Errors.forCode(filterResult.errorCode()), filterResult.errorMessage());
+                        if (error.isFailure()) {
+                            future.completeExceptionally(error.exception());
                         } else {
                             List<FilterResult> filterResults = new ArrayList<>();
-                            for (AclDeletionResult deletionResult : deletion.deletions()) {
-                                filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.error().exception()));
+                            for (DeleteAclsMatchingAcl matchingAcl : filterResult.matchingAcls()) {
+                                ApiError aclError = new ApiError(Errors.forCode(matchingAcl.errorCode()),
+                                    matchingAcl.errorMessage());
+                                AclBinding aclBinding = DeleteAclsResponse.aclBinding(matchingAcl);
+                                filterResults.add(new FilterResult(aclBinding, aclError.exception()));
                             }
                             future.complete(new FilterResults(filterResults));
                         }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 000c654..f48f112 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -21,12 +21,16 @@ import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsResponseData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
 import org.apache.kafka.common.message.CreatePartitionsRequestData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -98,10 +102,6 @@ import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
-import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsRequest;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
@@ -178,8 +178,8 @@ public enum ApiKeys {
     TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
                       TxnOffsetCommitResponseData.SCHEMAS),
     DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS, DescribeAclsResponseData.SCHEMAS),
-    CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(), CreateAclsResponse.schemaVersions()),
-    DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(), DeleteAclsResponse.schemaVersions()),
+    CREATE_ACLS(30, "CreateAcls", CreateAclsRequestData.SCHEMAS, CreateAclsResponseData.SCHEMAS),
+    DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS, DeleteAclsResponseData.SCHEMAS),
     DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(),
             DescribeConfigsResponse.schemaVersions()),
     ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 5da38aa..fe756a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.resource.PatternType;
 
 public class CommonFields {
     public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms",
@@ -43,30 +42,4 @@ public class CommonFields {
             "The transactional id or null if the producer is not transactional");
     public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
     public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
-
-    // ACL APIs
-    public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type");
-    public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name");
-    public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter");
-    public static final Field.Int8 RESOURCE_PATTERN_TYPE = new Field.Int8("resource_pattern_type", "The resource pattern type", PatternType.LITERAL.code());
-    public static final Field.Int8 RESOURCE_PATTERN_TYPE_FILTER = new Field.Int8("resource_pattern_type_filter", "The resource pattern type filter", PatternType.LITERAL.code());
-    public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal");
-    public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter");
-    public static final Field.Str HOST = new Field.Str("host", "The ACL host");
-    public static final Field.NullableStr HOST_FILTER = new Field.NullableStr("host", "The ACL host filter");
-    public static final Field.Int8 OPERATION = new Field.Int8("operation", "The ACL operation");
-    public static final Field.Int8 PERMISSION_TYPE = new Field.Int8("permission_type", "The ACL permission type");
-
-    public static final Field.Str PRINCIPAL_TYPE = new Field.Str("principal_type", "principalType of the Kafka principal");
-    public static final Field.Str PRINCIPAL_NAME = new Field.Str("name", "name of the Kafka principal");
-
-    public static final Field.Int64 COMMITTED_OFFSET = new Field.Int64("offset",
-            "Message offset to be committed");
-    public static final Field.NullableStr COMMITTED_METADATA = new Field.NullableStr("metadata",
-            "Any associated metadata the client wants to keep.");
-    public static final Field.Int32 COMMITTED_LEADER_EPOCH = new Field.Int32("leader_epoch",
-            "The leader epoch, if provided is derived from the last consumed record. " +
-                    "This is used by the consumer to check for log truncation and to ensure partition " +
-                    "metadata is up to date following a group rebalance.");
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8187bf8..621b6a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -141,9 +141,9 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
             case DESCRIBE_ACLS:
                 return new DescribeAclsResponse(struct, version);
             case CREATE_ACLS:
-                return new CreateAclsResponse(struct);
+                return new CreateAclsResponse(struct, version);
             case DELETE_ACLS:
-                return new DeleteAclsResponse(struct);
+                return new DeleteAclsResponse(struct, version);
             case DESCRIBE_CONFIGS:
                 return new DescribeConfigsResponse(struct);
             case ALTER_CONFIGS:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 992e3d4..3eb88a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -19,171 +19,123 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
+import org.apache.kafka.common.message.CreateAclsResponseData;
+import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourceType;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-
 public class CreateAclsRequest extends AbstractRequest {
-    private final static String CREATIONS_KEY_NAME = "creations";
-
-    private static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
-            new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
-                    RESOURCE_TYPE,
-                    RESOURCE_NAME,
-                    PRINCIPAL,
-                    HOST,
-                    OPERATION,
-                    PERMISSION_TYPE))));
-
-    /**
-     * Version 1 adds RESOURCE_PATTERN_TYPE, to support more than just literal resource patterns.
-     * For more info, see {@link PatternType}.
-     *
-     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
-     */
-    private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
-            new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
-                    RESOURCE_TYPE,
-                    RESOURCE_NAME,
-                    RESOURCE_PATTERN_TYPE,
-                    PRINCIPAL,
-                    HOST,
-                    OPERATION,
-                    PERMISSION_TYPE))));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
-    }
-
-    public static class AclCreation {
-        private final AclBinding acl;
-
-        public AclCreation(AclBinding acl) {
-            this.acl = acl;
-        }
-
-        static AclCreation fromStruct(Struct struct) {
-            ResourcePattern pattern = RequestUtils.resourcePatternromStructFields(struct);
-            AccessControlEntry entry = RequestUtils.aceFromStructFields(struct);
-            return new AclCreation(new AclBinding(pattern, entry));
-        }
-
-        public AclBinding acl() {
-            return acl;
-        }
-
-        void setStructFields(Struct struct) {
-            RequestUtils.resourcePatternSetStructFields(acl.pattern(), struct);
-            RequestUtils.aceSetStructFields(acl.entry(), struct);
-        }
-
-        @Override
-        public String toString() {
-            return "(acl=" + acl + ")";
-        }
-    }
 
     public static class Builder extends AbstractRequest.Builder<CreateAclsRequest> {
-        private final List<AclCreation> creations;
+        private final CreateAclsRequestData data;
 
-        public Builder(List<AclCreation> creations) {
+        public Builder(CreateAclsRequestData data) {
             super(ApiKeys.CREATE_ACLS);
-            this.creations = creations;
+            this.data = data;
         }
 
         @Override
         public CreateAclsRequest build(short version) {
-            return new CreateAclsRequest(version, creations);
+            return new CreateAclsRequest(version, data);
         }
 
         @Override
         public String toString() {
-            return "(type=CreateAclsRequest, creations=" + Utils.join(creations, ", ") + ")";
+            return data.toString();
         }
     }
 
-    private final List<AclCreation> aclCreations;
+    private final CreateAclsRequestData data;
 
-    CreateAclsRequest(short version, List<AclCreation> aclCreations) {
+    CreateAclsRequest(short version, CreateAclsRequestData data) {
         super(ApiKeys.CREATE_ACLS, version);
-        this.aclCreations = aclCreations;
-
-        validate(aclCreations);
+        validate(data);
+        this.data = data;
     }
 
     public CreateAclsRequest(Struct struct, short version) {
-        super(ApiKeys.CREATE_ACLS, version);
-        this.aclCreations = new ArrayList<>();
-        for (Object creationStructObj : struct.getArray(CREATIONS_KEY_NAME)) {
-            Struct creationStruct = (Struct) creationStructObj;
-            aclCreations.add(AclCreation.fromStruct(creationStruct));
-        }
+        this(version, new CreateAclsRequestData(struct, version));
     }
 
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
-        List<Struct> requests = new ArrayList<>();
-        for (AclCreation creation : aclCreations) {
-            Struct creationStruct = struct.instance(CREATIONS_KEY_NAME);
-            creation.setStructFields(creationStruct);
-            requests.add(creationStruct);
-        }
-        struct.set(CREATIONS_KEY_NAME, requests.toArray());
-        return struct;
+    public List<AclCreation> aclCreations() {
+        return data.creations();
     }
 
-    public List<AclCreation> aclCreations() {
-        return aclCreations;
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
-        List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
-        for (int i = 0; i < aclCreations.size(); i++)
-            responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
-        return new CreateAclsResponse(throttleTimeMs, responses);
+        CreateAclsResponseData.AclCreationResult result = CreateAclsRequest.aclResult(throwable);
+        List<CreateAclsResponseData.AclCreationResult> results = Collections.nCopies(data.creations().size(), result);
+        return new CreateAclsResponse(new CreateAclsResponseData()
+            .setThrottleTimeMs(throttleTimeMs)
+            .setResults(results));
     }
 
     public static CreateAclsRequest parse(ByteBuffer buffer, short version) {
         return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version, buffer), version);
     }
 
-    private void validate(List<AclCreation> aclCreations) {
+    private void validate(CreateAclsRequestData data) {
         if (version() == 0) {
-            final boolean unsupported = aclCreations.stream()
-                .map(AclCreation::acl)
-                .map(AclBinding::pattern)
-                .map(ResourcePattern::patternType)
-                .anyMatch(patternType -> patternType != PatternType.LITERAL);
-            if (unsupported) {
+            final boolean unsupported = data.creations().stream().anyMatch(creation ->
+                creation.resourcePatternType() != PatternType.LITERAL.code());
+            if (unsupported)
                 throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
-            }
         }
 
-        final boolean unknown = aclCreations.stream()
-            .map(AclCreation::acl)
-            .anyMatch(AclBinding::isUnknown);
-        if (unknown) {
-            throw new IllegalArgumentException("You can not create ACL bindings with unknown elements");
-        }
+        final boolean unknown = data.creations().stream().anyMatch(creation ->
+            creation.resourcePatternType() == PatternType.UNKNOWN.code()
+                || creation.resourceType() == ResourceType.UNKNOWN.code()
+                || creation.permissionType() == AclPermissionType.UNKNOWN.code()
+                || creation.operation() == AclOperation.UNKNOWN.code());
+        if (unknown)
+            throw new IllegalArgumentException("CreatableAcls contain unknown elements: " + data.creations());
+    }
+
+    public static AclBinding aclBinding(AclCreation acl) {
+        ResourcePattern pattern = new ResourcePattern(
+            ResourceType.fromCode(acl.resourceType()),
+            acl.resourceName(),
+            PatternType.fromCode(acl.resourcePatternType()));
+        AccessControlEntry entry = new AccessControlEntry(
+            acl.principal(),
+            acl.host(),
+            AclOperation.fromCode(acl.operation()),
+            AclPermissionType.fromCode(acl.permissionType()));
+        return new AclBinding(pattern, entry);
+    }
+
+    public static AclCreation aclCreation(AclBinding binding) {
+        return new AclCreation()
+            .setHost(binding.entry().host())
+            .setOperation(binding.entry().operation().code())
+            .setPermissionType(binding.entry().permissionType().code())
+            .setPrincipal(binding.entry().principal())
+            .setResourceName(binding.pattern().name())
+            .setResourceType(binding.pattern().resourceType().code())
+            .setResourcePatternType(binding.pattern().patternType().code());
+    }
+
+    private static AclCreationResult aclResult(Throwable throwable) {
+        ApiError apiError = ApiError.fromThrowable(throwable);
+        return new AclCreationResult()
+            .setErrorCode(apiError.error().code())
+            .setErrorMessage(apiError.message());
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index d5f52dd..f9ef457 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -16,110 +16,48 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.CreateAclsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import java.util.stream.Collectors;
 
 public class CreateAclsResponse extends AbstractResponse {
-    private final static String CREATION_RESPONSES_KEY_NAME = "creation_responses";
-
-    private static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(CREATION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
-                    ERROR_CODE,
-                    ERROR_MESSAGE))));
-
-    /**
-     * The version number is bumped to indicate that, on quota violation, brokers send out responses before throttling.
-     */
-    private static final Schema CREATE_ACLS_RESPONSE_V1 = CREATE_ACLS_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_RESPONSE_V0, CREATE_ACLS_RESPONSE_V1};
-    }
-
-    public static class AclCreationResponse {
-        private final ApiError error;
-
-        public AclCreationResponse(ApiError error) {
-            this.error = error;
-        }
-
-        public ApiError error() {
-            return error;
-        }
-
-        @Override
-        public String toString() {
-            return "(" + error + ")";
-        }
-    }
-
-    private final int throttleTimeMs;
-
-    private final List<AclCreationResponse> aclCreationResponses;
+    private final CreateAclsResponseData data;
 
-    public CreateAclsResponse(int throttleTimeMs, List<AclCreationResponse> aclCreationResponses) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.aclCreationResponses = aclCreationResponses;
+    public CreateAclsResponse(CreateAclsResponseData data) {
+        this.data = data;
     }
 
-    public CreateAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        this.aclCreationResponses = new ArrayList<>();
-        for (Object responseStructObj : struct.getArray(CREATION_RESPONSES_KEY_NAME)) {
-            Struct responseStruct = (Struct) responseStructObj;
-            ApiError error = new ApiError(responseStruct);
-            this.aclCreationResponses.add(new AclCreationResponse(error));
-        }
+    public CreateAclsResponse(Struct struct, short version) {
+        this.data = new CreateAclsResponseData(struct, version);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        List<Struct> responseStructs = new ArrayList<>();
-        for (AclCreationResponse response : aclCreationResponses) {
-            Struct responseStruct = struct.instance(CREATION_RESPONSES_KEY_NAME);
-            response.error.write(responseStruct);
-            responseStructs.add(responseStruct);
-        }
-        struct.set(CREATION_RESPONSES_KEY_NAME, responseStructs.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
-    public List<AclCreationResponse> aclCreationResponses() {
-        return aclCreationResponses;
+    public List<CreateAclsResponseData.AclCreationResult> results() {
+        return data.results();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        Map<Errors, Integer> errorCounts = new HashMap<>();
-        for (AclCreationResponse response : aclCreationResponses)
-            updateErrorCounts(errorCounts, response.error.error());
-        return errorCounts;
+        return errorCounts(results().stream().map(r -> Errors.forCode(r.errorCode())).collect(Collectors.toList()));
     }
 
     public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
-        return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
+        return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer), version);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 5a20ceb..b020b2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -16,149 +16,136 @@
  */
 package org.apache.kafka.common.requests;
 
+import java.util.Collections;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourceType;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;
-import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DeleteAclsRequest extends AbstractRequest {
-    private final static String FILTERS = "filters";
-
-    private static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
-            new Field(FILTERS, new ArrayOf(new Schema(
-                    RESOURCE_TYPE,
-                    RESOURCE_NAME_FILTER,
-                    PRINCIPAL_FILTER,
-                    HOST_FILTER,
-                    OPERATION,
-                    PERMISSION_TYPE))));
-
-    /**
-     * V1 sees a new `RESOURCE_PATTERN_TYPE_FILTER` that controls how the filter handles different resource pattern types.
-     * For more info, see {@link PatternType}.
-     *
-     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
-     */
-    private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
-            new Field(FILTERS, new ArrayOf(new Schema(
-                    RESOURCE_TYPE,
-                    RESOURCE_NAME_FILTER,
-                    RESOURCE_PATTERN_TYPE_FILTER,
-                    PRINCIPAL_FILTER,
-                    HOST_FILTER,
-                    OPERATION,
-                    PERMISSION_TYPE))));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
-    }
-
     public static class Builder extends AbstractRequest.Builder<DeleteAclsRequest> {
-        private final List<AclBindingFilter> filters;
+        private final DeleteAclsRequestData data;
 
-        public Builder(List<AclBindingFilter> filters) {
+        public Builder(DeleteAclsRequestData data) {
             super(DELETE_ACLS);
-            this.filters = filters;
+            this.data = data;
         }
 
         @Override
         public DeleteAclsRequest build(short version) {
-            return new DeleteAclsRequest(version, filters);
+            return new DeleteAclsRequest(version, data);
         }
 
         @Override
         public String toString() {
-            return "(type=DeleteAclsRequest, filters=" + Utils.join(filters, ", ") + ")";
+            return data.toString();
         }
+
     }
 
-    private final List<AclBindingFilter> filters;
+    private final DeleteAclsRequestData data;
 
-    DeleteAclsRequest(short version, List<AclBindingFilter> filters) {
+    private DeleteAclsRequest(short version, DeleteAclsRequestData data) {
         super(ApiKeys.DELETE_ACLS, version);
-        this.filters = filters;
+        this.data = data;
+        normalizeAndValidate();
+    }
+
+    private void normalizeAndValidate() {
+        if (version() == 0) {
+            for (DeleteAclsRequestData.DeleteAclsFilter filter : data.filters()) {
+                PatternType patternType = PatternType.fromCode(filter.patternTypeFilter());
+
+                // On older brokers, no pattern types existed except LITERAL (effectively). So even though ANY is not
+                // directly supported on those brokers, we can get the same effect as ANY by setting the pattern type
+                // to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
+                if (patternType == PatternType.ANY)
+                    filter.setPatternTypeFilter(PatternType.LITERAL.code());
+                else if (patternType != PatternType.LITERAL)
+                    throw new UnsupportedVersionException("Version 0 does not support pattern type " +
+                            patternType + " (only LITERAL and ANY are supported)");
+            }
+        }
+
+        final boolean unknown = data.filters().stream().anyMatch(filter ->
+                filter.patternTypeFilter() == PatternType.UNKNOWN.code()
+                        || filter.resourceTypeFilter() == ResourceType.UNKNOWN.code()
+                        || filter.operation() == AclOperation.UNKNOWN.code()
+                        || filter.permissionType() == AclPermissionType.UNKNOWN.code()
+        );
 
-        validate(version, filters);
+        if (unknown) {
+            throw new IllegalArgumentException("Filters contain UNKNOWN elements, filters: " + data.filters());
+        }
     }
 
     public DeleteAclsRequest(Struct struct, short version) {
         super(ApiKeys.DELETE_ACLS, version);
-        this.filters = new ArrayList<>();
-        for (Object filterStructObj : struct.getArray(FILTERS)) {
-            Struct filterStruct = (Struct) filterStructObj;
-            ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(filterStruct);
-            AccessControlEntryFilter aceFilter = RequestUtils.aceFilterFromStructFields(filterStruct);
-            this.filters.add(new AclBindingFilter(resourceFilter, aceFilter));
-        }
+        this.data = new DeleteAclsRequestData(struct, version);
     }
 
     public List<AclBindingFilter> filters() {
-        return filters;
+        return data.filters().stream().map(DeleteAclsRequest::aclBindingFilter).collect(Collectors.toList());
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(DELETE_ACLS.requestSchema(version()));
-        List<Struct> filterStructs = new ArrayList<>();
-        for (AclBindingFilter filter : filters) {
-            Struct filterStruct = struct.instance(FILTERS);
-            RequestUtils.resourcePatternFilterSetStructFields(filter.patternFilter(), filterStruct);
-            RequestUtils.aceFilterSetStructFields(filter.entryFilter(), filterStruct);
-            filterStructs.add(filterStruct);
-        }
-        struct.set(FILTERS, filterStructs.toArray());
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) {
-        List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
-        for (int i = 0; i < filters.size(); i++) {
-            responses.add(new DeleteAclsResponse.AclFilterResponse(
-                ApiError.fromThrowable(throwable), Collections.emptySet()));
-        }
-        return new DeleteAclsResponse(throttleTimeMs, responses);
+        ApiError apiError = ApiError.fromThrowable(throwable);
+        List<DeleteAclsFilterResult> filterResults = Collections.nCopies(data.filters().size(),
+            new DeleteAclsResponseData.DeleteAclsFilterResult()
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message()));
+        return new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(throttleTimeMs)
+            .setFilterResults(filterResults));
     }
 
     public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {
         return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version, buffer), version);
     }
 
-    private void validate(short version, List<AclBindingFilter> filters) {
-        if (version == 0) {
-            final boolean unsupported = filters.stream()
-                .map(AclBindingFilter::patternFilter)
-                .map(ResourcePatternFilter::patternType)
-                .anyMatch(patternType -> patternType != PatternType.LITERAL && patternType != PatternType.ANY);
-            if (unsupported) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
-            }
-        }
+    public static DeleteAclsFilter deleteAclsFilter(AclBindingFilter filter) {
+        return new DeleteAclsFilter()
+            .setResourceNameFilter(filter.patternFilter().name())
+            .setResourceTypeFilter(filter.patternFilter().resourceType().code())
+            .setPatternTypeFilter(filter.patternFilter().patternType().code())
+            .setHostFilter(filter.entryFilter().host())
+            .setOperation(filter.entryFilter().operation().code())
+            .setPermissionType(filter.entryFilter().permissionType().code())
+            .setPrincipalFilter(filter.entryFilter().principal());
+    }
 
-        final boolean unknown = filters.stream().anyMatch(AclBindingFilter::isUnknown);
-        if (unknown) {
-            throw new IllegalArgumentException("Filters contain UNKNOWN elements");
-        }
+    private static AclBindingFilter aclBindingFilter(DeleteAclsFilter filter) {
+        ResourcePatternFilter patternFilter = new ResourcePatternFilter(
+            ResourceType.fromCode(filter.resourceTypeFilter()),
+            filter.resourceNameFilter(),
+            PatternType.fromCode(filter.patternTypeFilter()));
+        AccessControlEntryFilter entryFilter = new AccessControlEntryFilter(
+            filter.principalFilter(),
+            filter.hostFilter(),
+            AclOperation.fromCode(filter.operation()),
+            AclPermissionType.fromCode(filter.permissionType()));
+        return new AclBindingFilter(patternFilter, entryFilter);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index a3b81cc..cb0ca07 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,223 +18,66 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import java.util.stream.Collectors;
 
 public class DeleteAclsResponse extends AbstractResponse {
     public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class);
-    private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
-    private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";
-
-    private static final Schema MATCHING_ACL_V0 = new Schema(
-            ERROR_CODE,
-            ERROR_MESSAGE,
-            RESOURCE_TYPE,
-            RESOURCE_NAME,
-            PRINCIPAL,
-            HOST,
-            OPERATION,
-            PERMISSION_TYPE);
-
-    /**
-     * V1 sees a new `RESOURCE_PATTERN_TYPE` that defines the type of the resource pattern.
-     *
-     * For more info, see {@link PatternType}.
-     */
-    private static final Schema MATCHING_ACL_V1 = new Schema(
-            ERROR_CODE,
-            ERROR_MESSAGE,
-            RESOURCE_TYPE,
-            RESOURCE_NAME,
-            RESOURCE_PATTERN_TYPE,
-            PRINCIPAL,
-            HOST,
-            OPERATION,
-            PERMISSION_TYPE);
-
-    private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(FILTER_RESPONSES_KEY_NAME,
-                    new ArrayOf(new Schema(
-                            ERROR_CODE,
-                            ERROR_MESSAGE,
-                            new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));
 
-    /**
-     * V1 sees a new `RESOURCE_PATTERN_TYPE` field added to MATCHING_ACL_V1, that describes how the resource pattern is interpreted
-     * and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
-     *
-     * For more info, see {@link PatternType}.
-     */
-    private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(FILTER_RESPONSES_KEY_NAME,
-                    new ArrayOf(new Schema(
-                            ERROR_CODE,
-                            ERROR_MESSAGE,
-                            new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V1), "The matching ACLs")))));
+    private final DeleteAclsResponseData data;
 
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
+    public DeleteAclsResponse(DeleteAclsResponseData data) {
+        this.data = data;
     }
 
-    public static class AclDeletionResult {
-        private final ApiError error;
-        private final AclBinding acl;
-
-        public AclDeletionResult(ApiError error, AclBinding acl) {
-            this.error = error;
-            this.acl = acl;
-        }
-
-        public AclDeletionResult(AclBinding acl) {
-            this(ApiError.NONE, acl);
-        }
-
-        public ApiError error() {
-            return error;
-        }
-
-        public AclBinding acl() {
-            return acl;
-        }
-
-        @Override
-        public String toString() {
-            return "(error=" + error + ", acl=" + acl + ")";
-        }
-    }
-
-    public static class AclFilterResponse {
-        private final ApiError error;
-        private final Collection<AclDeletionResult> deletions;
-
-        public AclFilterResponse(ApiError error, Collection<AclDeletionResult> deletions) {
-            this.error = error;
-            this.deletions = deletions;
-        }
-
-        public AclFilterResponse(Collection<AclDeletionResult> deletions) {
-            this(ApiError.NONE, deletions);
-        }
-
-        public ApiError error() {
-            return error;
-        }
-
-        public Collection<AclDeletionResult> deletions() {
-            return deletions;
-        }
-
-        @Override
-        public String toString() {
-            return "(error=" + error + ", deletions=" + Utils.join(deletions, ",") + ")";
-        }
-    }
-
-    private final int throttleTimeMs;
-
-    private final List<AclFilterResponse> responses;
-
-    public DeleteAclsResponse(int throttleTimeMs, List<AclFilterResponse> responses) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responses = responses;
-    }
-
-    public DeleteAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        this.responses = new ArrayList<>();
-        for (Object responseStructObj : struct.getArray(FILTER_RESPONSES_KEY_NAME)) {
-            Struct responseStruct = (Struct) responseStructObj;
-            ApiError error = new ApiError(responseStruct);
-            List<AclDeletionResult> deletions = new ArrayList<>();
-            for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS_KEY_NAME)) {
-                Struct matchingAclStruct = (Struct) matchingAclStructObj;
-                ApiError matchError = new ApiError(matchingAclStruct);
-                AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
-                ResourcePattern resource = RequestUtils.resourcePatternromStructFields(matchingAclStruct);
-                deletions.add(new AclDeletionResult(matchError, new AclBinding(resource, entry)));
-            }
-            this.responses.add(new AclFilterResponse(error, deletions));
-        }
+    public DeleteAclsResponse(Struct struct, short version) {
+        data = new DeleteAclsResponseData(struct, version);
     }
 
     @Override
     protected Struct toStruct(short version) {
         validate(version);
-
-        Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        List<Struct> responseStructs = new ArrayList<>();
-        for (AclFilterResponse response : responses) {
-            Struct responseStruct = struct.instance(FILTER_RESPONSES_KEY_NAME);
-            response.error.write(responseStruct);
-            List<Struct> deletionStructs = new ArrayList<>();
-            for (AclDeletionResult deletion : response.deletions()) {
-                Struct deletionStruct = responseStruct.instance(MATCHING_ACLS_KEY_NAME);
-                deletion.error.write(deletionStruct);
-                RequestUtils.resourcePatternSetStructFields(deletion.acl().pattern(), deletionStruct);
-                RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
-                deletionStructs.add(deletionStruct);
-            }
-            responseStruct.set(MATCHING_ACLS_KEY_NAME, deletionStructs.toArray(new Struct[0]));
-            responseStructs.add(responseStruct);
-        }
-        struct.set(FILTER_RESPONSES_KEY_NAME, responseStructs.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
-    public List<AclFilterResponse> responses() {
-        return responses;
+    public List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults() {
+        return data.filterResults();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        Map<Errors, Integer> errorCounts = new HashMap<>();
-        for (AclFilterResponse response : responses)
-            updateErrorCounts(errorCounts, response.error.error());
-        return errorCounts;
+        return errorCounts(filterResults().stream().map(r -> Errors.forCode(r.errorCode())).collect(Collectors.toList()));
     }
 
     public static DeleteAclsResponse parse(ByteBuffer buffer, short version) {
-        return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer));
+        return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.parseResponse(version, buffer), version);
     }
 
     public String toString() {
-        return "(responses=" + Utils.join(responses, ",") + ")";
+        return data.toString();
     }
 
     @Override
@@ -244,23 +87,60 @@ public class DeleteAclsResponse extends AbstractResponse {
 
     private void validate(short version) {
         if (version == 0) {
-            final boolean unsupported = responses.stream()
-                .flatMap(r -> r.deletions.stream())
-                .map(AclDeletionResult::acl)
-                .map(AclBinding::pattern)
-                .map(ResourcePattern::patternType)
-                .anyMatch(patternType -> patternType != PatternType.LITERAL);
-            if (unsupported) {
+            final boolean unsupported = filterResults().stream()
+                .flatMap(r -> r.matchingAcls().stream())
+                .anyMatch(matchingAcl -> matchingAcl.patternType() != PatternType.LITERAL.code());
+            if (unsupported)
                 throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
-            }
         }
 
-        final boolean unknown = responses.stream()
-            .flatMap(r -> r.deletions.stream())
-            .map(AclDeletionResult::acl)
-            .anyMatch(AclBinding::isUnknown);
-        if (unknown) {
-            throw new IllegalArgumentException("Response contains UNKNOWN elements");
-        }
+        final boolean unknown = filterResults().stream()
+                .flatMap(r -> r.matchingAcls().stream())
+                .anyMatch(matchingAcl -> matchingAcl.patternType() == PatternType.UNKNOWN.code()
+                    || matchingAcl.resourceType() == ResourceType.UNKNOWN.code()
+                    || matchingAcl.permissionType() == AclPermissionType.UNKNOWN.code()
+                    || matchingAcl.operation() == AclOperation.UNKNOWN.code());
+        if (unknown)
+            throw new IllegalArgumentException("DeleteAclsMatchingAcls contain UNKNOWN elements");
+    }
+
+    public static DeleteAclsFilterResult filterResult(AclDeleteResult result) {
+        ApiError error = result.exception().map(e -> ApiError.fromThrowable(e)).orElse(ApiError.NONE);
+        List<DeleteAclsMatchingAcl> matchingAcls = result.aclBindingDeleteResults().stream()
+            .map(DeleteAclsResponse::matchingAcl)
+            .collect(Collectors.toList());
+        return new DeleteAclsFilterResult()
+            .setErrorCode(error.error().code())
+            .setErrorMessage(error.message())
+            .setMatchingAcls(matchingAcls);
+    }
+
+    private static DeleteAclsMatchingAcl matchingAcl(AclDeleteResult.AclBindingDeleteResult result) {
+        ApiError error = result.exception().map(e -> ApiError.fromThrowable(e)).orElse(ApiError.NONE);
+        AclBinding acl = result.aclBinding();
+        return matchingAcl(acl, error);
     }
+
+    // Visible for testing
+    public static DeleteAclsMatchingAcl matchingAcl(AclBinding acl, ApiError error) {
+        return new DeleteAclsMatchingAcl()
+            .setErrorCode(error.error().code())
+            .setErrorMessage(error.message())
+            .setResourceName(acl.pattern().name())
+            .setResourceType(acl.pattern().resourceType().code())
+            .setPatternType(acl.pattern().patternType().code())
+            .setHost(acl.entry().host())
+            .setOperation(acl.entry().operation().code())
+            .setPermissionType(acl.entry().permissionType().code())
+            .setPrincipal(acl.entry().principal());
+    }
+
+    public static AclBinding aclBinding(DeleteAclsMatchingAcl matchingAcl) {
+        ResourcePattern resourcePattern = new ResourcePattern(ResourceType.fromCode(matchingAcl.resourceType()),
+            matchingAcl.resourceName(), PatternType.fromCode(matchingAcl.patternType()));
+        AccessControlEntry accessControlEntry = new AccessControlEntry(matchingAcl.principal(), matchingAcl.host(),
+            AclOperation.fromCode(matchingAcl.operation()), AclPermissionType.fromCode(matchingAcl.permissionType()));
+        return new AclBinding(resourcePattern, accessControlEntry);
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index f5c58f4..153001d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.resource.ResourceType;
 
 import java.nio.ByteBuffer;
 
-
 public class DescribeAclsRequest extends AbstractRequest {
 
     public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> {
@@ -64,10 +63,30 @@ public class DescribeAclsRequest extends AbstractRequest {
 
     private final DescribeAclsRequestData data;
 
-    public DescribeAclsRequest(DescribeAclsRequestData data, short version) {
+    private DescribeAclsRequest(DescribeAclsRequestData data, short version) {
         super(ApiKeys.DESCRIBE_ACLS, version);
         this.data = data;
-        validate(version);
+        normalizeAndValidate(version);
+    }
+
+    private void normalizeAndValidate(short version) {
+        if (version == 0) {
+            PatternType patternType = PatternType.fromCode(data.resourcePatternType());
+            // On older brokers, no pattern types existed except LITERAL (effectively). So even though ANY is not
+            // directly supported on those brokers, we can get the same effect as ANY by setting the pattern type
+            // to LITERAL. Note that the wildcard `*` is considered `LITERAL` for compatibility reasons.
+            if (patternType == PatternType.ANY)
+                data.setResourcePatternType(PatternType.LITERAL.code());
+            else if (patternType != PatternType.LITERAL)
+                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
+        }
+
+        if (data.resourcePatternType() == PatternType.UNKNOWN.code()
+                || data.resourceType() == ResourceType.UNKNOWN.code()
+                || data.permissionType() == AclPermissionType.UNKNOWN.code()
+                || data.operation() == AclOperation.UNKNOWN.code()) {
+            throw new IllegalArgumentException("DescribeAclsRequest contains UNKNOWN elements: " + data);
+        }
     }
 
     public DescribeAclsRequest(Struct struct, short version) {
@@ -111,22 +130,4 @@ public class DescribeAclsRequest extends AbstractRequest {
         return new AclBindingFilter(rpf, acef);
     }
 
-    private void validate(short version) {
-        if (version == 0) {
-            if (data.resourcePatternType() == PatternType.ANY.code()) {
-                data.setResourcePatternType(PatternType.LITERAL.code());
-            }
-            if (data.resourcePatternType() != PatternType.LITERAL.code()) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
-            }
-        }
-
-        if (data.resourcePatternType() == PatternType.UNKNOWN.code()
-            || data.resourceType() == ResourceType.UNKNOWN.code()
-            || data.permissionType() == AclPermissionType.UNKNOWN.code()
-            || data.operation() == AclOperation.UNKNOWN.code()) {
-            throw new IllegalArgumentException("Filter contain UNKNOWN elements");
-        }
-    }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index bbf90b2..cb1bc43 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
@@ -105,9 +107,8 @@ public class DescribeAclsResponse extends AbstractResponse {
         }
     }
 
-    private static List<AclBinding> aclBindings(DescribeAclsResource resource) {
-        List<AclBinding> acls = new ArrayList<>();
-        for (AclDescription acl : resource.acls()) {
+    private static Stream<AclBinding> aclBindings(DescribeAclsResource resource) {
+        return resource.acls().stream().map(acl -> {
             ResourcePattern pattern = new ResourcePattern(
                     ResourceType.fromCode(resource.type()),
                     resource.name(),
@@ -117,26 +118,21 @@ public class DescribeAclsResponse extends AbstractResponse {
                     acl.host(),
                     AclOperation.fromCode(acl.operation()),
                     AclPermissionType.fromCode(acl.permissionType()));
-            acls.add(new AclBinding(pattern, entry));
-        }
-        return acls;
+            return new AclBinding(pattern, entry);
+        });
     }
 
     public static List<AclBinding> aclBindings(List<DescribeAclsResource> resources) {
-        List<AclBinding> acls = new ArrayList<>();
-        for (DescribeAclsResource resource : resources) {
-            acls.addAll(aclBindings(resource));
-        }
-        return acls;
+        return resources.stream().flatMap(DescribeAclsResponse::aclBindings).collect(Collectors.toList());
     }
 
-    public static DescribeAclsResponse prepareResponse(int throttleTimeMs, ApiError error, Collection<AclBinding> acls) {
-        Map<ResourcePattern, List<AccessControlEntry>> map = new HashMap<>();
+    public static List<DescribeAclsResource> aclsResources(Collection<AclBinding> acls) {
+        Map<ResourcePattern, List<AccessControlEntry>> patternToEntries = new HashMap<>();
         for (AclBinding acl : acls) {
-            map.computeIfAbsent(acl.pattern(), v -> new ArrayList<>()).add(acl.entry());
+            patternToEntries.computeIfAbsent(acl.pattern(), v -> new ArrayList<>()).add(acl.entry());
         }
-        List<DescribeAclsResource> resources = new ArrayList<>();
-        for (Entry<ResourcePattern, List<AccessControlEntry>> entry : map.entrySet()) {
+        List<DescribeAclsResource> resources = new ArrayList<>(patternToEntries.size());
+        for (Entry<ResourcePattern, List<AccessControlEntry>> entry : patternToEntries.entrySet()) {
             ResourcePattern key = entry.getKey();
             List<AclDescription> aclDescriptions = new ArrayList<>();
             for (AccessControlEntry ace : entry.getValue()) {
@@ -148,17 +144,12 @@ public class DescribeAclsResponse extends AbstractResponse {
                 aclDescriptions.add(ad);
             }
             DescribeAclsResource dar = new DescribeAclsResource()
-                    .setName(key.name())
-                    .setPatternType(key.patternType().code())
-                    .setType(key.resourceType().code())
-                    .setAcls(aclDescriptions);
+                .setName(key.name())
+                .setPatternType(key.patternType().code())
+                .setType(key.resourceType().code())
+                .setAcls(aclDescriptions);
             resources.add(dar);
         }
-        DescribeAclsResponseData data = new DescribeAclsResponseData()
-                .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(error.error().code())
-                .setErrorMessage(error.message())
-                .setResources(resources);
-        return new DescribeAclsResponse(data);
+        return resources;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index c3dfaa1..2b1f04c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -16,97 +16,17 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.acl.AccessControlEntry;
-import org.apache.kafka.common.acl.AccessControlEntryFilter;
-import org.apache.kafka.common.acl.AclOperation;
-import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.resource.ResourcePattern;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceType;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-
 public final class RequestUtils {
 
     private RequestUtils() {}
 
-    static ResourcePattern resourcePatternromStructFields(Struct struct) {
-        byte resourceType = struct.get(RESOURCE_TYPE);
-        String name = struct.get(RESOURCE_NAME);
-        PatternType patternType = PatternType.fromCode(
-            struct.getOrElse(RESOURCE_PATTERN_TYPE, PatternType.LITERAL.code()));
-        return new ResourcePattern(ResourceType.fromCode(resourceType), name, patternType);
-    }
-
-    static void resourcePatternSetStructFields(ResourcePattern pattern, Struct struct) {
-        struct.set(RESOURCE_TYPE, pattern.resourceType().code());
-        struct.set(RESOURCE_NAME, pattern.name());
-        struct.setIfExists(RESOURCE_PATTERN_TYPE, pattern.patternType().code());
-    }
-
-    static ResourcePatternFilter resourcePatternFilterFromStructFields(Struct struct) {
-        byte resourceType = struct.get(RESOURCE_TYPE);
-        String name = struct.get(RESOURCE_NAME_FILTER);
-        PatternType patternType = PatternType.fromCode(
-            struct.getOrElse(RESOURCE_PATTERN_TYPE_FILTER, PatternType.LITERAL.code()));
-        return new ResourcePatternFilter(ResourceType.fromCode(resourceType), name, patternType);
-    }
-
-    static void resourcePatternFilterSetStructFields(ResourcePatternFilter patternFilter, Struct struct) {
-        struct.set(RESOURCE_TYPE, patternFilter.resourceType().code());
-        struct.set(RESOURCE_NAME_FILTER, patternFilter.name());
-        struct.setIfExists(RESOURCE_PATTERN_TYPE_FILTER, patternFilter.patternType().code());
-    }
-
-    static AccessControlEntry aceFromStructFields(Struct struct) {
-        String principal = struct.get(PRINCIPAL);
-        String host = struct.get(HOST);
-        byte operation = struct.get(OPERATION);
-        byte permissionType = struct.get(PERMISSION_TYPE);
-        return new AccessControlEntry(principal, host, AclOperation.fromCode(operation),
-            AclPermissionType.fromCode(permissionType));
-    }
-
-    static void aceSetStructFields(AccessControlEntry data, Struct struct) {
-        struct.set(PRINCIPAL, data.principal());
-        struct.set(HOST, data.host());
-        struct.set(OPERATION, data.operation().code());
-        struct.set(PERMISSION_TYPE, data.permissionType().code());
-    }
-
-    static AccessControlEntryFilter aceFilterFromStructFields(Struct struct) {
-        String principal = struct.get(PRINCIPAL_FILTER);
-        String host = struct.get(HOST_FILTER);
-        byte operation = struct.get(OPERATION);
-        byte permissionType = struct.get(PERMISSION_TYPE);
-        return new AccessControlEntryFilter(principal, host, AclOperation.fromCode(operation),
-            AclPermissionType.fromCode(permissionType));
-    }
-
-    static void aceFilterSetStructFields(AccessControlEntryFilter filter, Struct struct) {
-        struct.set(PRINCIPAL_FILTER, filter.principal());
-        struct.set(HOST_FILTER, filter.host());
-        struct.set(OPERATION, filter.operation().code());
-        struct.set(PERMISSION_TYPE, filter.permissionType().code());
-    }
-
     static void setLeaderEpochIfExists(Struct struct, Field.Int32 leaderEpochField, Optional<Integer> leaderEpoch) {
         struct.setIfExists(leaderEpochField, leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH));
     }
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json b/clients/src/main/resources/common/message/CreateAclsRequest.json
index 72d4d11..a9bd9c5 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -18,10 +18,11 @@
   "type": "request",
   "name": "CreateAclsRequest",
   // Version 1 adds resource pattern type.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  // Version 2 enables flexible versions.
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
-    { "name": "Creations", "type": "[]CreatableAcl", "versions": "0+", 
+    { "name": "Creations", "type": "[]AclCreation", "versions": "0+",
       "about": "The ACLs that we want to create.", "fields": [
       { "name": "ResourceType", "type": "int8", "versions": "0+",
         "about": "The type of the resource." },
diff --git a/clients/src/main/resources/common/message/CreateAclsResponse.json b/clients/src/main/resources/common/message/CreateAclsResponse.json
index d84b723..7b0de7e 100644
--- a/clients/src/main/resources/common/message/CreateAclsResponse.json
+++ b/clients/src/main/resources/common/message/CreateAclsResponse.json
@@ -18,12 +18,13 @@
   "type": "response",
   "name": "CreateAclsResponse",
   // Starting in version 1, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  // Version 2 enables flexible versions.
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "Results", "type": "[]CreatableAclResult", "versions": "0+",
+    { "name": "Results", "type": "[]AclCreationResult", "versions": "0+",
       "about": "The results for each ACL creation.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The result error, or zero if there was no error." },
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index c9d6d4a..664737e 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -18,8 +18,9 @@
   "type": "request",
   "name": "DeleteAclsRequest",
   // Version 1 adds the pattern type.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  // Version 2 enables flexible versions.
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
     { "name": "Filters", "type": "[]DeleteAclsFilter", "versions": "0+",
       "about": "The filters to use when deleting ACLs.", "fields": [
diff --git a/clients/src/main/resources/common/message/DeleteAclsResponse.json b/clients/src/main/resources/common/message/DeleteAclsResponse.json
index 303fa2b..08f5702 100644
--- a/clients/src/main/resources/common/message/DeleteAclsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteAclsResponse.json
@@ -19,8 +19,9 @@
   "name": "DeleteAclsResponse",
   // Version 1 adds the resource pattern type.
   // Starting in version 1, on quota violation, brokers send out responses before throttling.
-  "validVersions": "0-1",
-  "flexibleVersions": "none",
+  // Version 2 enables flexible versions.
+  "validVersions": "0-2",
+  "flexibleVersions": "2+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f82fddd..c869aaa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -61,13 +61,16 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateAclsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
+import org.apache.kafka.common.message.DescribeAclsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
@@ -88,13 +91,10 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -713,18 +713,18 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Test a call where we get back ACL1 and ACL2.
-            env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0, ApiError.NONE,
-                    asList(ACL1, ACL2)));
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData()
+                .setResources(DescribeAclsResponse.aclsResources(asList(ACL1, ACL2)))));
             assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2);
 
             // Test a call where we get back no results.
-            env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0, ApiError.NONE,
-                Collections.<AclBinding>emptySet()));
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData()));
             assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
 
             // Test a call where we get back an error.
-            env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0,
-                new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData()
+                .setErrorCode(Errors.SECURITY_DISABLED.code())
+                .setErrorMessage("Security is disabled")));
             TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
 
             // Test a call where we supply an invalid filter.
@@ -739,8 +739,9 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Test a call where we successfully create two ACLs.
-            env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
-                asList(new AclCreationResponse(ApiError.NONE), new AclCreationResponse(ApiError.NONE))));
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(new CreateAclsResponseData().setResults(asList(
+                new CreateAclsResponseData.AclCreationResult(),
+                new CreateAclsResponseData.AclCreationResult()))));
             CreateAclsResult results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
             for (KafkaFuture<Void> future : results.values().values())
@@ -748,10 +749,11 @@ public class KafkaAdminClientTest {
             results.all().get();
 
             // Test a call where we fail to create one ACL.
-            env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList(
-                new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED, "Security is disabled")),
-                new AclCreationResponse(ApiError.NONE))
-            ));
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(new CreateAclsResponseData().setResults(asList(
+                new CreateAclsResponseData.AclCreationResult()
+                    .setErrorCode(Errors.SECURITY_DISABLED.code())
+                    .setErrorMessage("Security is disabled"),
+                new CreateAclsResponseData.AclCreationResult()))));
             results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
             TestUtils.assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
@@ -766,10 +768,16 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // Test a call where one filter has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
-                    new AclFilterResponse(asList(new AclDeletionResult(ACL1), new AclDeletionResult(ACL2))),
-                    new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
-                            Collections.<AclDeletionResult>emptySet()))));
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData()
+                .setThrottleTimeMs(0)
+                .setFilterResults(asList(
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()
+                        .setMatchingAcls(asList(
+                            DeleteAclsResponse.matchingAcl(ACL1, ApiError.NONE),
+                            DeleteAclsResponse.matchingAcl(ACL2, ApiError.NONE))),
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()
+                        .setErrorCode(Errors.SECURITY_DISABLED.code())
+                        .setErrorMessage("No security")))));
             DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
             FilterResults filter1Results = filterResults.get(FILTER1).get();
@@ -781,18 +789,28 @@ public class KafkaAdminClientTest {
             TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
-                    new AclFilterResponse(asList(new AclDeletionResult(ACL1),
-                            new AclDeletionResult(new ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))),
-                    new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData()
+                .setThrottleTimeMs(0)
+                .setFilterResults(asList(
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()
+                        .setMatchingAcls(asList(
+                            DeleteAclsResponse.matchingAcl(ACL1, ApiError.NONE),
+                            new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+                                .setErrorCode(Errors.SECURITY_DISABLED.code())
+                                .setErrorMessage("No security"))),
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()))));
             results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             assertTrue(results.values().get(FILTER2).get().values().isEmpty());
             TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
-                    new AclFilterResponse(asList(new AclDeletionResult(ACL1))),
-                    new AclFilterResponse(asList(new AclDeletionResult(ACL2))))));
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData()
+                .setThrottleTimeMs(0)
+                .setFilterResults(asList(
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()
+                        .setMatchingAcls(asList(DeleteAclsResponse.matchingAcl(ACL1, ApiError.NONE))),
+                    new DeleteAclsResponseData.DeleteAclsFilterResult()
+                        .setMatchingAcls(asList(DeleteAclsResponse.matchingAcl(ACL2, ApiError.NONE)))))));
             results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Collection<AclBinding> deleted = results.all().get();
             assertCollectionIs(deleted, ACL1, ACL2);
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java b/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
index 942ff7f..945246d 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
@@ -32,19 +32,4 @@ public final class MessageTestUtil {
         bytes.flip();
         return bytes;
     }
-
-    public static void messageFromByteBuffer(ByteBuffer bytes, Message message, short version) {
-        message.read(new ByteBufferAccessor(bytes.duplicate()), version);
-    }
-
-    public static String byteBufferToString(ByteBuffer buf) {
-        ByteBuffer buf2 = buf.duplicate();
-        StringBuilder bld = new StringBuilder();
-        String prefix = "";
-        while (buf2.hasRemaining()) {
-            bld.append(String.format("%s%02x", prefix, (int) buf2.get()));
-            prefix = " ";
-        }
-        return bld.toString();
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
index 5642677..06b4b9f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.CreateAclsRequestData;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
@@ -53,17 +53,17 @@ public class CreateAclsRequestTest {
 
     @Test(expected = UnsupportedVersionException.class)
     public void shouldThrowOnV0IfNotLiteral() {
-        new CreateAclsRequest(V0, aclCreations(PREFIXED_ACL1));
+        new CreateAclsRequest(V0, data(PREFIXED_ACL1));
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowOnIfUnknown() {
-        new CreateAclsRequest(V0, aclCreations(UNKNOWN_ACL1));
+        new CreateAclsRequest(V0, data(UNKNOWN_ACL1));
     }
 
     @Test
     public void shouldRoundTripV0() {
-        final CreateAclsRequest original = new CreateAclsRequest(V0, aclCreations(LITERAL_ACL1, LITERAL_ACL2));
+        final CreateAclsRequest original = new CreateAclsRequest(V0, data(LITERAL_ACL1, LITERAL_ACL2));
         final Struct struct = original.toStruct();
 
         final CreateAclsRequest result = new CreateAclsRequest(struct, V0);
@@ -73,7 +73,7 @@ public class CreateAclsRequestTest {
 
     @Test
     public void shouldRoundTripV1() {
-        final CreateAclsRequest original = new CreateAclsRequest(V1, aclCreations(LITERAL_ACL1, PREFIXED_ACL1));
+        final CreateAclsRequest original = new CreateAclsRequest(V1, data(LITERAL_ACL1, PREFIXED_ACL1));
         final Struct struct = original.toStruct();
 
         final CreateAclsRequest result = new CreateAclsRequest(struct, V1);
@@ -85,15 +85,16 @@ public class CreateAclsRequestTest {
         assertEquals("Number of Acls wrong", original.aclCreations().size(), actual.aclCreations().size());
 
         for (int idx = 0; idx != original.aclCreations().size(); ++idx) {
-            final AclBinding originalBinding = original.aclCreations().get(idx).acl();
-            final AclBinding actualBinding = actual.aclCreations().get(idx).acl();
+            final AclBinding originalBinding = CreateAclsRequest.aclBinding(original.aclCreations().get(idx));
+            final AclBinding actualBinding = CreateAclsRequest.aclBinding(actual.aclCreations().get(idx));
             assertEquals(originalBinding, actualBinding);
         }
     }
 
-    private static List<AclCreation> aclCreations(final AclBinding... acls) {
-        return Arrays.stream(acls)
-            .map(AclCreation::new)
+    private static CreateAclsRequestData data(final AclBinding... acls) {
+        List<CreateAclsRequestData.AclCreation> aclCreations = Arrays.stream(acls)
+            .map(CreateAclsRequest::aclCreation)
             .collect(Collectors.toList());
+        return new CreateAclsRequestData().setCreations(aclCreations);
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
index 9be8d59..19b65ed 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -22,16 +22,18 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.List;
+import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class DeleteAclsRequestTest {
     private static final short V0 = 0;
@@ -49,19 +51,19 @@ public class DeleteAclsRequestTest {
     private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    @Test(expected = UnsupportedVersionException.class)
+    @Test
     public void shouldThrowOnV0IfPrefixed() {
-        new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER));
+        assertThrows(UnsupportedVersionException.class, () -> new DeleteAclsRequest.Builder(requestData(PREFIXED_FILTER)).build(V0));
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowOnUnknownElements() {
-        new DeleteAclsRequest(V1, aclFilters(UNKNOWN_FILTER));
+        assertThrows(IllegalArgumentException.class, () -> new DeleteAclsRequest.Builder(requestData(UNKNOWN_FILTER)).build(V1));
     }
 
     @Test
     public void shouldRoundTripLiteralV0() {
-        final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(LITERAL_FILTER));
+        final DeleteAclsRequest original = new DeleteAclsRequest.Builder(requestData(LITERAL_FILTER)).build(V0);
         final Struct struct = original.toStruct();
 
         final DeleteAclsRequest result = new DeleteAclsRequest(struct, V0);
@@ -71,13 +73,14 @@ public class DeleteAclsRequestTest {
 
     @Test
     public void shouldRoundTripAnyV0AsLiteral() {
-        final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(ANY_FILTER));
-        final DeleteAclsRequest expected = new DeleteAclsRequest(V0, aclFilters(
+        final DeleteAclsRequest original = new DeleteAclsRequest.Builder(requestData(ANY_FILTER)).build(V0);
+        final DeleteAclsRequest expected = new DeleteAclsRequest.Builder(requestData(
             new AclBindingFilter(new ResourcePatternFilter(
                 ANY_FILTER.patternFilter().resourceType(),
                 ANY_FILTER.patternFilter().name(),
                 PatternType.LITERAL),
-                ANY_FILTER.entryFilter())));
+                ANY_FILTER.entryFilter()))
+        ).build(V0);
 
         final DeleteAclsRequest result = new DeleteAclsRequest(original.toStruct(), V0);
 
@@ -86,7 +89,9 @@ public class DeleteAclsRequestTest {
 
     @Test
     public void shouldRoundTripV1() {
-        final DeleteAclsRequest original = new DeleteAclsRequest(V1, aclFilters(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER));
+        final DeleteAclsRequest original = new DeleteAclsRequest.Builder(
+                requestData(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER)
+        ).build(V1);
         final Struct struct = original.toStruct();
 
         final DeleteAclsRequest result = new DeleteAclsRequest(struct, V1);
@@ -104,7 +109,9 @@ public class DeleteAclsRequestTest {
         }
     }
 
-    private static List<AclBindingFilter> aclFilters(final AclBindingFilter... acls) {
-        return Arrays.asList(acls);
+    private static DeleteAclsRequestData requestData(AclBindingFilter... acls) {
+        return new DeleteAclsRequestData().setFilters(asList(acls).stream()
+            .map(DeleteAclsRequest::deleteAclsFilter)
+            .collect(Collectors.toList()));
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
index f8bec15..aaebc0c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
@@ -17,101 +17,106 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.acl.AccessControlEntry;
-import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 
 public class DeleteAclsResponseTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
-        new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
-
-    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
-        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
-
-    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
-        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
-
-    private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "group", PatternType.LITERAL),
-        new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
-
-    private static final AclFilterResponse LITERAL_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, LITERAL_ACL2));
-
-    private static final AclFilterResponse PREFIXED_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, PREFIXED_ACL1));
-
-    private static final AclFilterResponse UNKNOWN_RESPONSE = new AclFilterResponse(aclDeletions(UNKNOWN_ACL));
+    private static final DeleteAclsMatchingAcl LITERAL_ACL1 = new DeleteAclsMatchingAcl()
+        .setResourceType(ResourceType.TOPIC.code())
+        .setResourceName("foo")
+        .setPatternType(PatternType.LITERAL.code())
+        .setPrincipal("User:ANONYMOUS")
+        .setHost("127.0.0.1")
+        .setOperation(AclOperation.READ.code())
+        .setPermissionType(AclPermissionType.DENY.code());
+
+    private static final DeleteAclsMatchingAcl LITERAL_ACL2 = new DeleteAclsMatchingAcl()
+            .setResourceType(ResourceType.GROUP.code())
+            .setResourceName("group")
+            .setPatternType(PatternType.LITERAL.code())
+            .setPrincipal("User:*")
+            .setHost("127.0.0.1")
+            .setOperation(AclOperation.WRITE.code())
+            .setPermissionType(AclPermissionType.ALLOW.code());
+
+    private static final DeleteAclsMatchingAcl PREFIXED_ACL1 = new DeleteAclsMatchingAcl()
+            .setResourceType(ResourceType.GROUP.code())
+            .setResourceName("prefix")
+            .setPatternType(PatternType.PREFIXED.code())
+            .setPrincipal("User:*")
+            .setHost("127.0.0.1")
+            .setOperation(AclOperation.CREATE.code())
+            .setPermissionType(AclPermissionType.ALLOW.code());
+
+    private static final DeleteAclsMatchingAcl UNKNOWN_ACL = new DeleteAclsMatchingAcl()
+            .setResourceType(ResourceType.UNKNOWN.code())
+            .setResourceName("group")
+            .setPatternType(PatternType.LITERAL.code())
+            .setPrincipal("User:*")
+            .setHost("127.0.0.1")
+            .setOperation(AclOperation.WRITE.code())
+            .setPermissionType(AclPermissionType.ALLOW.code());
+
+    private static final DeleteAclsFilterResult LITERAL_RESPONSE = new DeleteAclsFilterResult().setMatchingAcls(asList(
+        LITERAL_ACL1, LITERAL_ACL2));
+
+    private static final DeleteAclsFilterResult PREFIXED_RESPONSE = new DeleteAclsFilterResult().setMatchingAcls(asList(
+        LITERAL_ACL1, PREFIXED_ACL1));
+
+    private static final DeleteAclsFilterResult UNKNOWN_RESPONSE = new DeleteAclsFilterResult().setMatchingAcls(asList(
+            UNKNOWN_ACL));
 
     @Test(expected = UnsupportedVersionException.class)
     public void shouldThrowOnV0IfNotLiteral() {
-        new DeleteAclsResponse(10, aclResponses(PREFIXED_RESPONSE)).toStruct(V0);
+        new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(10)
+            .setFilterResults(singletonList(PREFIXED_RESPONSE))
+        ).toStruct(V0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowOnIfUnknown() {
-        new DeleteAclsResponse(10, aclResponses(UNKNOWN_RESPONSE)).toStruct(V1);
+        new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(10)
+            .setFilterResults(singletonList(UNKNOWN_RESPONSE))
+        ).toStruct(V1);
     }
 
     @Test
     public void shouldRoundTripV0() {
-        final DeleteAclsResponse original = new DeleteAclsResponse(10, aclResponses(LITERAL_RESPONSE));
+        final DeleteAclsResponse original = new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(10)
+            .setFilterResults(singletonList(LITERAL_RESPONSE)));
         final Struct struct = original.toStruct(V0);
 
-        final DeleteAclsResponse result = new DeleteAclsResponse(struct);
-
-        assertResponseEquals(original, result);
+        final DeleteAclsResponse result = new DeleteAclsResponse(struct, V0);
+        assertEquals(original.filterResults(), result.filterResults());
     }
 
     @Test
     public void shouldRoundTripV1() {
-        final DeleteAclsResponse original = new DeleteAclsResponse(100, aclResponses(LITERAL_RESPONSE, PREFIXED_RESPONSE));
+        final DeleteAclsResponse original = new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(10)
+            .setFilterResults(asList(LITERAL_RESPONSE, PREFIXED_RESPONSE)));
         final Struct struct = original.toStruct(V1);
 
-        final DeleteAclsResponse result = new DeleteAclsResponse(struct);
-
-        assertResponseEquals(original, result);
+        final DeleteAclsResponse result = new DeleteAclsResponse(struct, V1);
+        assertEquals(original.filterResults(), result.filterResults());
     }
 
-    private static void assertResponseEquals(final DeleteAclsResponse original, final DeleteAclsResponse actual) {
-        assertEquals("Number of responses wrong", original.responses().size(), actual.responses().size());
-
-        for (int idx = 0; idx != original.responses().size(); ++idx) {
-            final List<AclBinding> originalBindings = original.responses().get(idx).deletions().stream()
-                .map(AclDeletionResult::acl)
-                .collect(Collectors.toList());
-
-            final List<AclBinding> actualBindings = actual.responses().get(idx).deletions().stream()
-                .map(AclDeletionResult::acl)
-                .collect(Collectors.toList());
-
-            assertEquals(originalBindings, actualBindings);
-        }
-    }
-
-    private static List<AclFilterResponse> aclResponses(final AclFilterResponse... responses) {
-        return Arrays.asList(responses);
-    }
-
-    private static Collection<AclDeletionResult> aclDeletions(final AclBinding... acls) {
-        return Arrays.stream(acls)
-            .map(AclDeletionResult::new)
-            .collect(Collectors.toList());
-    }
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
index 8be1658..e9203ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class DescribeAclsRequestTest {
     private static final short V0 = 0;
@@ -46,14 +47,14 @@ public class DescribeAclsRequestTest {
     private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    @Test(expected = UnsupportedVersionException.class)
+    @Test
     public void shouldThrowOnV0IfPrefixed() {
-        new DescribeAclsRequest.Builder(PREFIXED_FILTER).build(V0);
+        assertThrows(UnsupportedVersionException.class, () -> new DescribeAclsRequest.Builder(PREFIXED_FILTER).build(V0));
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void shouldThrowIfUnknown() {
-        new DescribeAclsRequest.Builder(UNKNOWN_FILTER).build(V0);
+        assertThrows(IllegalArgumentException.class, () -> new DescribeAclsRequest.Builder(UNKNOWN_FILTER).build(V0));
     }
 
     @Test
@@ -117,4 +118,4 @@ public class DescribeAclsRequestTest {
         final AclBindingFilter acttualFilter = actual.filter();
         assertEquals(originalFilter, acttualFilter);
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
index ec07bc3..77ecd18 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
@@ -99,7 +99,8 @@ public class DescribeAclsResponseTest {
         final DescribeAclsResponse result = new DescribeAclsResponse(struct, V0);
         assertResponseEquals(original, result);
 
-        final DescribeAclsResponse result2 = DescribeAclsResponse.prepareResponse(10, ApiError.NONE, DescribeAclsResponse.aclBindings(resources));
+        final DescribeAclsResponse result2 = buildResponse(10, Errors.NONE, DescribeAclsResponse.aclsResources(
+            DescribeAclsResponse.aclBindings(resources)));
         assertResponseEquals(original, result2);
     }
 
@@ -112,7 +113,8 @@ public class DescribeAclsResponseTest {
         final DescribeAclsResponse result = new DescribeAclsResponse(struct, V1);
         assertResponseEquals(original, result);
 
-        final DescribeAclsResponse result2 = DescribeAclsResponse.prepareResponse(100, ApiError.NONE, DescribeAclsResponse.aclBindings(resources));
+        final DescribeAclsResponse result2 = buildResponse(100, Errors.NONE, DescribeAclsResponse.aclsResources(
+            DescribeAclsResponse.aclBindings(resources)));
         assertResponseEquals(original, result2);
     }
 
@@ -156,4 +158,4 @@ public class DescribeAclsResponseTest {
             .setOperation(operation.code())
             .setPermissionType(permission.code());
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7943101..2304fcf 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -43,21 +43,25 @@ import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsResponseData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreatePartitionsRequestData;
-import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
 import org.apache.kafka.common.message.CreatePartitionsResponseData;
 import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
@@ -133,11 +137,7 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
@@ -1780,44 +1780,72 @@ public class RequestResponseTest {
     }
 
     private CreateAclsRequest createCreateAclsRequest() {
-        List<AclCreation> creations = new ArrayList<>();
-        creations.add(new AclCreation(new AclBinding(
+        List<CreateAclsRequestData.AclCreation> creations = new ArrayList<>();
+        creations.add(CreateAclsRequest.aclCreation(new AclBinding(
             new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW))));
-        creations.add(new AclCreation(new AclBinding(
+        creations.add(CreateAclsRequest.aclCreation(new AclBinding(
             new ResourcePattern(ResourceType.GROUP, "mygroup", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))));
-        return new CreateAclsRequest.Builder(creations).build();
+        CreateAclsRequestData data = new CreateAclsRequestData().setCreations(creations);
+        return new CreateAclsRequest.Builder(data).build();
     }
 
     private CreateAclsResponse createCreateAclsResponse() {
-        return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(ApiError.NONE),
-            new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo bar"))));
+        return new CreateAclsResponse(new CreateAclsResponseData().setResults(asList(
+            new CreateAclsResponseData.AclCreationResult(),
+            new CreateAclsResponseData.AclCreationResult()
+                .setErrorCode(Errors.INVALID_REQUEST.code())
+                .setErrorMessage("Foo bar"))));
     }
 
     private DeleteAclsRequest createDeleteAclsRequest() {
-        List<AclBindingFilter> filters = new ArrayList<>();
-        filters.add(new AclBindingFilter(
-            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
-            new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)));
-        filters.add(new AclBindingFilter(
-            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
-            new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)));
-        return new DeleteAclsRequest.Builder(filters).build();
+        DeleteAclsRequestData data = new DeleteAclsRequestData().setFilters(asList(
+            new DeleteAclsRequestData.DeleteAclsFilter()
+                .setResourceTypeFilter(ResourceType.ANY.code())
+                .setResourceNameFilter(null)
+                .setPatternTypeFilter(PatternType.LITERAL.code())
+                .setPrincipalFilter("User:ANONYMOUS")
+                .setHostFilter(null)
+                .setOperation(AclOperation.ANY.code())
+                .setPermissionType(AclPermissionType.ANY.code()),
+            new DeleteAclsRequestData.DeleteAclsFilter()
+                .setResourceTypeFilter(ResourceType.ANY.code())
+                .setResourceNameFilter(null)
+                .setPatternTypeFilter(PatternType.LITERAL.code())
+                .setPrincipalFilter("User:bob")
+                .setHostFilter(null)
+                .setOperation(AclOperation.ANY.code())
+                .setPermissionType(AclPermissionType.ANY.code())
+        ));
+        return new DeleteAclsRequest.Builder(data).build();
     }
 
     private DeleteAclsResponse createDeleteAclsResponse() {
-        List<AclFilterResponse> responses = new ArrayList<>();
-        responses.add(new AclFilterResponse(Utils.mkSet(
-                new AclDeletionResult(new AclBinding(
-                        new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
-                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
-                new AclDeletionResult(new AclBinding(
-                        new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL),
-                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))))));
-        responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
-            Collections.<AclDeletionResult>emptySet()));
-        return new DeleteAclsResponse(0, responses);
+        List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults = new ArrayList<>();
+        filterResults.add(new DeleteAclsResponseData.DeleteAclsFilterResult().setMatchingAcls(asList(
+                new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+                    .setResourceType(ResourceType.TOPIC.code())
+                    .setResourceName("mytopic3")
+                    .setPatternType(PatternType.LITERAL.code())
+                    .setPrincipal("User:ANONYMOUS")
+                    .setHost("*")
+                    .setOperation(AclOperation.DESCRIBE.code())
+                    .setPermissionType(AclPermissionType.ALLOW.code()),
+                new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+                    .setResourceType(ResourceType.TOPIC.code())
+                    .setResourceName("mytopic4")
+                    .setPatternType(PatternType.LITERAL.code())
+                    .setPrincipal("User:ANONYMOUS")
+                    .setHost("*")
+                    .setOperation(AclOperation.DESCRIBE.code())
+                    .setPermissionType(AclPermissionType.DENY.code()))));
+        filterResults.add(new DeleteAclsResponseData.DeleteAclsFilterResult()
+            .setErrorCode(Errors.SECURITY_DISABLED.code())
+            .setErrorMessage("No security"));
+        return new DeleteAclsResponse(new DeleteAclsResponseData()
+            .setThrottleTimeMs(0)
+            .setFilterResults(filterResults));
     }
 
     private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/DelayedFuture.scala b/core/src/main/scala/kafka/server/DelayedFuture.scala
index 823a851..cf522ab 100644
--- a/core/src/main/scala/kafka/server/DelayedFuture.scala
+++ b/core/src/main/scala/kafka/server/DelayedFuture.scala
@@ -30,7 +30,7 @@ import scala.collection.Seq
   * in a DelayedFuturePurgatory purgatory. This is used for ACL updates using async Authorizers.
   */
 class DelayedFuture[T](timeoutMs: Long,
-                       futures: List[CompletableFuture[T]],
+                       futures: Seq[CompletableFuture[T]],
                        responseCallback: () => Unit)
   extends DelayedOperation(timeoutMs) {
 
@@ -79,7 +79,7 @@ class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
   val purgatoryKey = new Object
 
   def tryCompleteElseWatch[T](timeoutMs: Long,
-                              futures: List[CompletableFuture[T]],
+                              futures: Seq[CompletableFuture[T]],
                               responseCallback: () => Unit): DelayedFuture[T] = {
     val delayedFuture = new DelayedFuture[T](timeoutMs, futures, responseCallback)
     val done = purgatory.tryCompleteElseWatch(delayedFuture, Seq(purgatoryKey))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e817ea..e3adb04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,11 +49,12 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteGroupsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, Offs [...]
+import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
+import org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseD [...]
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
 import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
-import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
+import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
@@ -64,8 +65,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
-import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -81,6 +80,7 @@ import org.apache.kafka.server.authorizer._
 
 import scala.compat.java8.OptionConverters._
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.util.{Failure, Success, Try}
 
@@ -2210,14 +2210,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     authorizer match {
       case None =>
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          DescribeAclsResponse.prepareResponse(requestThrottleMs,
-            new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), util.Collections.emptySet()))
+          new DescribeAclsResponse(new DescribeAclsResponseData()
+            .setErrorCode(Errors.SECURITY_DISABLED.code)
+            .setErrorMessage("No Authorizer is configured on the broker")
+            .setThrottleTimeMs(requestThrottleMs)))
       case Some(auth) =>
         val filter = describeAclsRequest.filter
         val returnedAcls = new util.HashSet[AclBinding]()
         auth.acls(filter).forEach(returnedAcls.add)
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          DescribeAclsResponse.prepareResponse(requestThrottleMs, ApiError.NONE, returnedAcls))
+          new DescribeAclsResponse(new DescribeAclsResponseData()
+            .setThrottleTimeMs(requestThrottleMs)
+            .setResources(DescribeAclsResponse.aclsResources(returnedAcls))))
     }
   }
 
@@ -2226,40 +2230,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     val createAclsRequest = request.body[CreateAclsRequest]
 
     authorizer match {
-      case None =>
-        sendResponseMaybeThrottle(request, requestThrottleMs =>
-          createAclsRequest.getErrorResponse(requestThrottleMs,
-            new SecurityDisabledException("No Authorizer is configured on the broker.")))
+      case None => sendResponseMaybeThrottle(request, requestThrottleMs =>
+        createAclsRequest.getErrorResponse(requestThrottleMs,
+          new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
-
+        val allBindings = createAclsRequest.aclCreations.asScala.map(CreateAclsRequest.aclBinding)
         val errorResults = mutable.Map[AclBinding, AclCreateResult]()
-        val aclBindings = createAclsRequest.aclCreations.asScala.map(_.acl)
-        val validBindings = aclBindings
-          .filter { acl =>
-            val resource = acl.pattern
-            val throwable = if (resource.resourceType == ResourceType.CLUSTER && !AuthorizerUtils.isClusterResource(resource.name))
-                new InvalidRequestException("The only valid name for the CLUSTER resource is " + CLUSTER_NAME)
-            else if (resource.name.isEmpty)
-              new InvalidRequestException("Invalid empty resource name")
-            else
-              null
-            if (throwable != null) {
-              debug(s"Failed to add acl $acl to $resource", throwable)
-              errorResults(acl) = new AclCreateResult(throwable)
-              false
-            } else
-              true
-          }
+        val validBindings = new ArrayBuffer[AclBinding]
+        allBindings.foreach { acl =>
+          val resource = acl.pattern
+          val throwable = if (resource.resourceType == ResourceType.CLUSTER && !AuthorizerUtils.isClusterResource(resource.name))
+              new InvalidRequestException("The only valid name for the CLUSTER resource is " + CLUSTER_NAME)
+          else if (resource.name.isEmpty)
+            new InvalidRequestException("Invalid empty resource name")
+          else
+            null
+          if (throwable != null) {
+            debug(s"Failed to add acl $acl to $resource", throwable)
+            errorResults(acl) = new AclCreateResult(throwable)
+          } else
+            validBindings += acl
+        }
+
+        val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
 
-        val createResults = auth.createAcls(request.context, validBindings.asJava)
-          .asScala.map(_.toCompletableFuture).toList
         def sendResponseCallback(): Unit = {
-          val aclCreationResults = aclBindings.map { acl =>
+          val aclCreationResults = allBindings.map { acl =>
             val result = errorResults.getOrElse(acl, createResults(validBindings.indexOf(acl)).get)
-            new AclCreationResponse(result.exception.asScala.map(ApiError.fromThrowable).getOrElse(ApiError.NONE))
+            val creationResult = new AclCreationResult()
+            result.exception.asScala.foreach { throwable =>
+              val apiError = ApiError.fromThrowable(throwable)
+              creationResult
+                .setErrorCode(apiError.error.code)
+                .setErrorMessage(apiError.message)
+            }
+            creationResult
           }
           sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new CreateAclsResponse(requestThrottleMs, aclCreationResults.asJava))
+            new CreateAclsResponse(new CreateAclsResponseData()
+              .setThrottleTimeMs(requestThrottleMs)
+              .setResults(aclCreationResults.asJava)))
         }
 
         alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, createResults, sendResponseCallback)
@@ -2279,19 +2289,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
           .asScala.map(_.toCompletableFuture).toList
 
-        def toErrorCode(exception: Optional[ApiException]): ApiError = {
-          exception.asScala.map(ApiError.fromThrowable).getOrElse(ApiError.NONE)
-        }
-
         def sendResponseCallback(): Unit = {
-          val filterResponses = deleteResults.map(_.get).map { result =>
-            val deletions = result.aclBindingDeleteResults().asScala.toList.map { deletionResult =>
-              new AclDeletionResult(toErrorCode(deletionResult.exception), deletionResult.aclBinding)
-            }.asJava
-            new AclFilterResponse(toErrorCode(result.exception), deletions)
-          }.asJava
+          val filterResults = deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
           sendResponseMaybeThrottle(request, requestThrottleMs =>
-            new DeleteAclsResponse(requestThrottleMs, filterResponses))
+            new DeleteAclsResponse(new DeleteAclsResponseData()
+              .setThrottleTimeMs(requestThrottleMs)
+              .setFilterResults(filterResults)))
         }
         alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, deleteResults, sendResponseCallback)
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a6d0887..cf570ca 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
+import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
@@ -41,15 +41,14 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
-import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteGroupsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData}
+import org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, OffsetCommitRequestData, SyncGroupRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord}
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.resource.{Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException, Node, TopicPartition, requests}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
@@ -166,9 +165,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
     ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
     ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
-    ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
+    ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => Errors.forCode(resp.results.asScala.head.errorCode)),
     ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
-    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
+    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => Errors.forCode(resp.filterResults.asScala.head.errorCode)),
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)),
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
@@ -470,15 +469,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
 
-  private def createAclsRequest = new CreateAclsRequest.Builder(
-    Collections.singletonList(new AclCreation(new AclBinding(
-      new ResourcePattern(ResourceType.TOPIC, "mytopic", LITERAL),
-      new AccessControlEntry(userPrincipalStr, "*", AclOperation.WRITE, DENY))))).build()
+  private def createAclsRequest: CreateAclsRequest = new CreateAclsRequest.Builder(
+    new CreateAclsRequestData().setCreations(Collections.singletonList(
+      new CreateAclsRequestData.AclCreation()
+        .setResourceType(ResourceType.TOPIC.code)
+        .setResourceName("mytopic")
+        .setResourcePatternType(PatternType.LITERAL.code)
+        .setPrincipal(userPrincipalStr)
+        .setHost("*")
+        .setOperation(AclOperation.WRITE.code)
+        .setPermissionType(AclPermissionType.DENY.code)))
+  ).build()
 
-  private def deleteAclsRequest = new DeleteAclsRequest.Builder(
-    Collections.singletonList(new AclBindingFilter(
-      new ResourcePatternFilter(ResourceType.TOPIC, null, LITERAL),
-      new AccessControlEntryFilter(userPrincipalStr, "*", AclOperation.ANY, DENY)))).build()
+  private def deleteAclsRequest: DeleteAclsRequest = new DeleteAclsRequest.Builder(
+    new DeleteAclsRequestData().setFilters(Collections.singletonList(
+      new DeleteAclsRequestData.DeleteAclsFilter()
+        .setResourceTypeFilter(ResourceType.TOPIC.code)
+        .setResourceNameFilter(null)
+        .setPatternTypeFilter(PatternType.LITERAL.code)
+        .setPrincipalFilter(userPrincipalStr)
+        .setHostFilter("*")
+        .setOperation(AclOperation.ANY.code)
+        .setPermissionType(AclPermissionType.DENY.code)))
+  ).build()
 
   private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 8bae11e..8bc13ec 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -35,9 +35,8 @@ import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
 import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
 import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
@@ -414,15 +413,25 @@ class RequestQuotaTest extends BaseRequestTest {
           new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
 
         case ApiKeys.CREATE_ACLS =>
-          new CreateAclsRequest.Builder(Collections.singletonList(new AclCreation(new AclBinding(
-            new ResourcePattern(AdminResourceType.TOPIC, "mytopic", PatternType.LITERAL),
-            new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY)))))
-
+          new CreateAclsRequest.Builder(new CreateAclsRequestData().setCreations(Collections.singletonList(
+            new CreateAclsRequestData.AclCreation()
+              .setResourceType(AdminResourceType.TOPIC.code)
+              .setResourceName("mytopic")
+              .setResourcePatternType(PatternType.LITERAL.code)
+              .setPrincipal("User:ANONYMOUS")
+              .setHost("*")
+              .setOperation(AclOperation.WRITE.code)
+              .setPermissionType(AclPermissionType.DENY.code))))
         case ApiKeys.DELETE_ACLS =>
-          new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter(
-            new ResourcePatternFilter(AdminResourceType.TOPIC, null, PatternType.LITERAL),
-            new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
-
+          new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(Collections.singletonList(
+            new DeleteAclsRequestData.DeleteAclsFilter()
+              .setResourceTypeFilter(AdminResourceType.TOPIC.code)
+              .setResourceNameFilter(null)
+              .setPatternTypeFilter(PatternType.LITERAL.code)
+              .setPrincipalFilter("User:ANONYMOUS")
+              .setHostFilter("*")
+              .setOperation(AclOperation.ANY.code)
+              .setPermissionType(AclPermissionType.DENY.code))))
         case ApiKeys.DESCRIBE_CONFIGS =>
           new DescribeConfigsRequest.Builder(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)))
 


Mime
View raw message