kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KIP-546: Implement describeClientQuotas and alterClientQuotas. (#8083)
Date Sun, 15 Mar 2020 06:03:52 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 227a732  KIP-546: Implement describeClientQuotas and alterClientQuotas. (#8083)
227a732 is described below

commit 227a7322b77840e08924b9486e4bda2f3dfc1f1a
Author: Brian Byrne <bbyrne@confluent.io>
AuthorDate: Sat Mar 14 23:03:13 2020 -0700

    KIP-546: Implement describeClientQuotas and alterClientQuotas. (#8083)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>
---
 build.gradle                                       |   2 +-
 checkstyle/import-control.xml                      |   7 +-
 checkstyle/suppressions.xml                        |   2 +-
 .../java/org/apache/kafka/clients/admin/Admin.java |  82 +++-
 .../clients/admin/AlterClientQuotasOptions.java    |  46 +++
 .../clients/admin/AlterClientQuotasResult.java     |  58 +++
 .../clients/admin/DescribeClientQuotasOptions.java |  29 ++
 .../clients/admin/DescribeClientQuotasResult.java  |  52 +++
 .../kafka/clients/admin/KafkaAdminClient.java      |  68 +++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   8 +-
 .../kafka/common/quota/ClientQuotaAlteration.java  | 106 +++++
 .../kafka/common/quota/ClientQuotaEntity.java      |  70 ++++
 .../kafka/common/quota/ClientQuotaFilter.java      | 101 +++++
 .../common/quota/ClientQuotaFilterComponent.java   | 109 +++++
 .../kafka/common/requests/AbstractRequest.java     |   4 +
 .../kafka/common/requests/AbstractResponse.java    |   4 +
 .../common/requests/AlterClientQuotasRequest.java  | 133 +++++++
 .../common/requests/AlterClientQuotasResponse.java | 124 ++++++
 .../requests/DescribeClientQuotasRequest.java      | 121 ++++++
 .../requests/DescribeClientQuotasResponse.java     | 119 ++++++
 .../common/message/AlterClientQuotasRequest.json   |  45 +++
 .../common/message/AlterClientQuotasResponse.json  |  40 ++
 .../message/DescribeClientQuotasRequest.json       |  35 ++
 .../message/DescribeClientQuotasResponse.json      |  47 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  95 +++++
 .../kafka/clients/admin/MockAdminClient.java       |  12 +
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 131 ++++--
 .../src/main/scala/kafka/server/AdminManager.scala | 199 ++++++++-
 .../main/scala/kafka/server/DynamicConfig.scala    |   4 +
 core/src/main/scala/kafka/server/KafkaApis.scala   |  30 ++
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 144 ++++++-
 .../kafka/server/ClientQuotasRequestTest.scala     | 443 +++++++++++++++++++++
 32 files changed, 2416 insertions(+), 54 deletions(-)

diff --git a/build.gradle b/build.gradle
index 975d56b..05fc20f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,7 +457,7 @@ subprojects {
     // See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
     scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
     scalaCompileOptions.additionalParameters += inlineFrom
-    
+
     // these options are valid for Scala versions < 2.13 only
     // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
     if (versions.baseScala == '2.12') {
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e3975a3..c4a7662 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -145,6 +145,7 @@
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.message" />
       <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.quota" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.common.resource" />
       <allow pkg="org.apache.kafka.common.record" />
@@ -162,6 +163,10 @@
     <subpackage name="utils">
       <allow pkg="org.apache.kafka.common" />
     </subpackage>
+
+    <subpackage name="quotas">
+      <allow pkg="org.apache.kafka.common" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="clients">
@@ -241,7 +246,7 @@
     <subpackage name="perf">
       <allow pkg="com.fasterxml.jackson.databind" />
     </subpackage>
-    
+
     <subpackage name="integration">
       <allow pkg="kafka.admin" />
       <allow pkg="kafka.api" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f533179..aa3a086 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -62,7 +62,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager).java"/>
 
     <suppress checks="JavaNCSS"
-              files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
+              files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
 
     <suppress checks="NPathComplexity"
               files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fd70e97..b99fd5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 
 /**
@@ -408,7 +410,6 @@ public interface Admin extends AutoCloseable {
         return incrementalAlterConfigs(configs, new AlterConfigsOptions());
     }
 
-
     /**
      * Incrementally update the configuration for the specified resources.
      * <p>
@@ -1133,6 +1134,85 @@ public interface Admin extends AutoCloseable {
     ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
 
     /**
+     * Describes all entities matching the provided filter that have at least one client quota configuration
+     * value defined.
+     * <p>
+     * This is a convenience method for {@link #describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param filter the filter to apply to match entities
+     * @return the DescribeClientQuotasResult containing the result
+     */
+    default DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) {
+        return describeClientQuotas(filter, new DescribeClientQuotasOptions());
+    }
+
+    /**
+     * Describes all entities matching the provided filter that have at least one client quota configuration
+     * value defined.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeClientQuotasResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have describe access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., an invalid entity type was specified.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe could finish.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param filter the filter to apply to match entities
+     * @param options the options to use
+     * @return the DescribeClientQuotasResult containing the result
+     */
+    DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
+
+    /**
+     * Alters client quota configurations with the specified alterations.
+     * <p>
+     * This is a convenience method for {@link #alterClientQuotas(Collection, AlterClientQuotasOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param entries the alterations to perform
+     * @return the AlterClientQuotasResult containing the result
+     */
+    default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries) {
+        return alterClientQuotas(entries, new AlterClientQuotasOptions());
+    }
+
+    /**
+     * Alters client quota configurations with the specified alterations.
+     * <p>
+     * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
+     * per-entity error code should be evaluated to resolve the success or failure of all updates.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@link AlterClientQuotasResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
+     *   succeed or not.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param entries the alterations to perform
+     * @return the AlterClientQuotasResult containing the result
+     */
+    AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
+
+    /**
      * Get the metrics kept by the adminClient
      */
     Map<MetricName, ? extends Metric> metrics();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java
new file mode 100644
index 0000000..3cdaa97
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
+
+    private boolean validateOnly = false;
+
+    /**
+     * Returns whether the request should be validated without altering the configs.
+     */
+    public boolean validateOnly() {
+        return this.validateOnly;
+    }
+
+    /**
+     * Sets whether the request should be validated without altering the configs.
+     */
+    public AlterClientQuotasOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java
new file mode 100644
index 0000000..63c6b3e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterClientQuotasResult {
+
+    private final Map<ClientQuotaEntity, KafkaFuture<Void>> futures;
+
+    /**
+     * Maps an entity to its alteration result.
+     *
+     * @param futures maps entity to its alteration result
+     */
+    public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Returns a map from quota entity to a future which can be used to check the status of the operation.
+     */
+    public Map<ClientQuotaEntity, KafkaFuture<Void>> values() {
+        return futures;
+    }
+
+    /**
+     * Returns a future which succeeds only if all quota alterations succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java
new file mode 100644
index 0000000..14e7e45
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link Admin#describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java
new file mode 100644
index 0000000..b485590
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeClientQuotasResult {
+
+    private final KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities;
+
+    /**
+     * Maps an entity to its configured quota value(s). Note if no value is defined for a quota
+     * type for that entity's config, then it is not included in the resulting value map.
+     *
+     * @param entities future for the collection of entities that matched the filter
+     */
+    public DescribeClientQuotasResult(KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities) {
+        this.entities = entities;
+    }
+
+    /**
+     * Returns a map from quota entity to a future which can be used to check the status of the operation.
+     */
+    public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities() {
+        return entities;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dbe3d1a..e654e73 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -129,8 +129,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AlterClientQuotasRequest;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
 import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
@@ -156,6 +161,8 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
 import org.apache.kafka.common.requests.DescribeConfigsRequest;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
@@ -3748,7 +3755,7 @@ public class KafkaAdminClient extends AdminClient {
         MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context =
                 new MetadataOperationContext<>(topics, options, deadline, futures);
 
-        Call metadataCall = getMetadataCall(context, 
+        Call metadataCall = getMetadataCall(context,
             () -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures));
         runnable.call(metadataCall, nowMetadata);
 
@@ -3845,6 +3852,65 @@ public class KafkaAdminClient extends AdminClient {
         return calls;
     }
 
+    @Override
+    public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) {
+        KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> future = new KafkaFutureImpl<>();
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now, options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+                @Override
+                DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) {
+                    return new DescribeClientQuotasRequest.Builder(filter);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse;
+                    response.complete(future);
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+            }, now);
+
+        return new DescribeClientQuotasResult(future);
+    }
+
+    @Override
+    public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
+        Map<ClientQuotaEntity, KafkaFutureImpl<Void>> futures = new HashMap<>(entries.size());
+        for (ClientQuotaAlteration entry : entries) {
+            futures.put(entry.entity(), new KafkaFutureImpl<>());
+        }
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("alterClientQuotas", calcDeadlineMs(now, options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+                @Override
+                AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
+                    return new AlterClientQuotasRequest.Builder(entries, options.validateOnly());
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    AlterClientQuotasResponse response = (AlterClientQuotasResponse) abstractResponse;
+                    response.complete(futures);
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(futures.values(), throwable);
+                }
+            }, now);
+
+        return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
+    }
+
     /**
      * Get a sub level error when the request is in batch. If given key was not found,
      * return an {@link IllegalArgumentException}.
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 4fc8287..6549d8f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.protocol;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.CreateAclsRequestData;
@@ -39,6 +41,8 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeAclsRequestData;
 import org.apache.kafka.common.message.DescribeAclsResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
 import org.apache.kafka.common.message.DescribeDelegationTokenRequestData;
 import org.apache.kafka.common.message.DescribeDelegationTokenResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
@@ -205,7 +209,9 @@ public enum ApiKeys {
                                   AlterPartitionReassignmentsResponseData.SCHEMAS),
     LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
                                  ListPartitionReassignmentsResponseData.SCHEMAS),
-    OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS);
+    OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS),
+    DESCRIBE_CLIENT_QUOTAS(48, "DescribeClientQuotas", DescribeClientQuotasRequestData.SCHEMAS, DescribeClientQuotasResponseData.SCHEMAS),
+    ALTER_CLIENT_QUOTAS(49, "AlterClientQuotas", AlterClientQuotasRequestData.SCHEMAS, AlterClientQuotasResponseData.SCHEMAS);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java
new file mode 100644
index 0000000..88670ce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Describes a configuration alteration to be made to a client quota entity.
+ */
+public class ClientQuotaAlteration {
+
+    public static class Op {
+        private final String key;
+        private final Double value;
+
+        /**
+         * @param key the quota type to alter
+         * @param value if set then the existing value is updated,
+         *              otherwise if null, the existing value is cleared
+         */
+        public Op(String key, Double value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        /**
+         * @return the quota type to alter
+         */
+        public String key() {
+            return this.key;
+        }
+
+        /**
+         * @return if set then the existing value is updated,
+         *         otherwise if null, the existing value is cleared
+         */
+        public Double value() {
+            return this.value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Op that = (Op) o;
+            return Objects.equals(key, that.key) && Objects.equals(value, that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+
+        @Override
+        public String toString() {
+            return "ClientQuotaAlteration.Op(key=" + key + ", value=" + value + ")";
+        }
+    }
+
+    private final ClientQuotaEntity entity;
+    private final Collection<Op> ops;
+
+    /**
+     * @param entity the entity whose config will be modified
+     * @param ops the alteration to perform
+     */
+    public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops) {
+        this.entity = entity;
+        this.ops = ops;
+    }
+
+    /**
+     * @return the entity whose config will be modified
+     */
+    public ClientQuotaEntity entity() {
+        return this.entity;
+    }
+
+    /**
+     * @return the alteration to perform
+     */
+    public Collection<Op> ops() {
+        return this.ops;
+    }
+
+    @Override
+    public String toString() {
+        return "ClientQuotaAlteration(entity=" + entity + ", ops=" + ops + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java
new file mode 100644
index 0000000..0fee3d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Describes a client quota entity, which is a mapping of entity types to their names.
+ */
+public class ClientQuotaEntity {
+
+    private final Map<String, String> entries;
+
+    /**
+     * The type of an entity entry.
+     */
+    public static final String USER = "user";
+    public static final String CLIENT_ID = "client-id";
+
+    /**
+     * Constructs a quota entity for the given types and names. If a name is null,
+     * then it is mapped to the built-in default entity name.
+     *
+     * @param entries maps entity type to its name
+     */
+    public ClientQuotaEntity(Map<String, String> entries) {
+        this.entries = entries;
+    }
+
+    /**
+     * @return map of entity type to its name
+     */
+    public Map<String, String> entries() {
+        return this.entries;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClientQuotaEntity that = (ClientQuotaEntity) o;
+        return Objects.equals(entries, that.entries);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entries);
+    }
+
+    @Override
+    public String toString() {
+        return "ClientQuotaEntity(entries=" + entries + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
new file mode 100644
index 0000000..e8a6a72
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * Describes a client quota entity filter.
+ */
+public class ClientQuotaFilter {
+
+    private final Collection<ClientQuotaFilterComponent> components;
+    private final boolean strict;
+
+    /**
+     * A filter to be applied to matching client quotas.
+     *
+     * @param components the components to filter on
+     * @param strict whether the filter only includes specified components
+     */
+    private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict) {
+        this.components = components;
+        this.strict = strict;
+    }
+
+    /**
+     * Constructs and returns a quota filter that matches all provided components. Matching entities
+     * with entity types that are not specified by a component will also be included in the result.
+     *
+     * @param components the components for the filter
+     */
+    public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components) {
+        return new ClientQuotaFilter(components, false);
+    }
+
+    /**
+     * Constructs and returns a quota filter that matches all provided components. Matching entities
+     * with entity types that are not specified by a component will *not* be included in the result.
+     *
+     * @param components the components for the filter
+     */
+    public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components) {
+        return new ClientQuotaFilter(components, true);
+    }
+
+    /**
+     * Constructs and returns a quota filter that matches all configured entities.
+     */
+    public static ClientQuotaFilter all() {
+        return new ClientQuotaFilter(Collections.emptyList(), false);
+    }
+
+    /**
+     * @return the filter's components
+     */
+    public Collection<ClientQuotaFilterComponent> components() {
+        return this.components;
+    }
+
+    /**
+     * @return whether the filter is strict, i.e. only includes specified components
+     */
+    public boolean strict() {
+        return this.strict;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClientQuotaFilter that = (ClientQuotaFilter) o;
+        return Objects.equals(components, that.components) && Objects.equals(strict, that.strict);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(components, strict);
+    }
+
+    @Override
+    public String toString() {
+        return "ClientQuotaFilter(components=" + components + ", strict=" + strict + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java
new file mode 100644
index 0000000..b981ead
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Describes a component for applying a client quota filter.
+ */
+public class ClientQuotaFilterComponent {
+
+    private final String entityType;
+    private final Optional<String> match;
+
+    /**
+     * A filter to be applied.
+     *
+     * @param entityType the entity type the filter component applies to
+     * @param match if present, the name that's matched exactly
+     *              if empty, matches the default name
+     *              if null, matches any specified name
+     */
+    private ClientQuotaFilterComponent(String entityType, Optional<String> match) {
+        this.entityType = Objects.requireNonNull(entityType);
+        this.match = match;
+    }
+
+    /**
+     * Constructs and returns a filter component that exactly matches the provided entity
+     * name for the entity type.
+     *
+     * @param entityType the entity type the filter component applies to
+     * @param entityName the entity name that's matched exactly
+     */
+    public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName) {
+        return new ClientQuotaFilterComponent(entityType, Optional.of(Objects.requireNonNull(entityName)));
+    }
+
+    /**
+     * Constructs and returns a filter component that matches the built-in default entity name
+     * for the entity type.
+     *
+     * @param entityType the entity type the filter component applies to
+     */
+    public static ClientQuotaFilterComponent ofDefaultEntity(String entityType) {
+        return new ClientQuotaFilterComponent(entityType, Optional.empty());
+    }
+
+    /**
+     * Constructs and returns a filter component that matches any specified name for the
+     * entity type.
+     *
+     * @param entityType the entity type the filter component applies to
+     */
+    public static ClientQuotaFilterComponent ofEntityType(String entityType) {
+        return new ClientQuotaFilterComponent(entityType, null);
+    }
+
+    /**
+     * @return the component's entity type
+     */
+    public String entityType() {
+        return this.entityType;
+    }
+
+    /**
+     * @return the optional match string, where:
+     *         if present, the name that's matched exactly
+     *         if empty, matches the default name
+     *         if null, matches any specified name
+     */
+    public Optional<String> match() {
+        return this.match;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClientQuotaFilterComponent that = (ClientQuotaFilterComponent) o;
+        return Objects.equals(entityType, match);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entityType, match);
+    }
+
+    @Override
+    public String toString() {
+        return "ClientQuotaFilterComponent(entityType=" + entityType + ", match=" + match + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 97bc728..f4085b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -239,6 +239,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
                 return new ListPartitionReassignmentsRequest(struct, apiVersion);
             case OFFSET_DELETE:
                 return new OffsetDeleteRequest(struct, apiVersion);
+            case DESCRIBE_CLIENT_QUOTAS:
+                return new DescribeClientQuotasRequest(struct, apiVersion);
+            case ALTER_CLIENT_QUOTAS:
+                return new AlterClientQuotasRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 37f60dc..9d3f39d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -176,6 +176,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
                 return new ListPartitionReassignmentsResponse(struct, version);
             case OFFSET_DELETE:
                 return new OffsetDeleteResponse(struct, version);
+            case DESCRIBE_CLIENT_QUOTAS:
+                return new DescribeClientQuotasResponse(struct, version);
+            case ALTER_CLIENT_QUOTAS:
+                return new AlterClientQuotasResponse(struct, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
new file mode 100644
index 0000000..6d4c2a1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterClientQuotasRequestData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.EntityData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.EntryData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterClientQuotasRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<AlterClientQuotasRequest> {
+
+        private final AlterClientQuotasRequestData data;
+
+        public Builder(Collection<ClientQuotaAlteration> entries, boolean validateOnly) {
+            super(ApiKeys.ALTER_CLIENT_QUOTAS);
+
+            List<EntryData> entryData = new ArrayList<>(entries.size());
+            for (ClientQuotaAlteration entry : entries) {
+                List<EntityData> entityData = new ArrayList<>(entry.entity().entries().size());
+                for (Map.Entry<String, String> entityEntries : entry.entity().entries().entrySet()) {
+                    entityData.add(new EntityData()
+                            .setEntityType(entityEntries.getKey())
+                            .setEntityName(entityEntries.getValue()));
+                }
+
+                List<OpData> opData = new ArrayList<>(entry.ops().size());
+                for (ClientQuotaAlteration.Op op : entry.ops()) {
+                    opData.add(new OpData()
+                            .setKey(op.key())
+                            .setValue(op.value() == null ? 0.0 : op.value())
+                            .setRemove(op.value() == null));
+                }
+
+                entryData.add(new EntryData()
+                        .setEntity(entityData)
+                        .setOps(opData));
+            }
+
+            this.data = new AlterClientQuotasRequestData()
+                    .setEntries(entryData)
+                    .setValidateOnly(validateOnly);
+        }
+
+        @Override
+        public AlterClientQuotasRequest build(short version) {
+            return new AlterClientQuotasRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final AlterClientQuotasRequestData data;
+
+    public AlterClientQuotasRequest(AlterClientQuotasRequestData data, short version) {
+        super(ApiKeys.ALTER_CLIENT_QUOTAS, version);
+        this.data = data;
+    }
+
+    public AlterClientQuotasRequest(Struct struct, short version) {
+        super(ApiKeys.ALTER_CLIENT_QUOTAS, version);
+        this.data = new AlterClientQuotasRequestData(struct, version);
+    }
+
+    public Collection<ClientQuotaAlteration> entries() {
+        List<ClientQuotaAlteration> entries = new ArrayList<>(data.entries().size());
+        for (EntryData entryData : data.entries()) {
+            Map<String, String> entity = new HashMap<>(entryData.entity().size());
+            for (EntityData entityData : entryData.entity()) {
+                entity.put(entityData.entityType(), entityData.entityName());
+            }
+
+            List<ClientQuotaAlteration.Op> ops = new ArrayList<>(entryData.ops().size());
+            for (OpData opData : entryData.ops()) {
+                Double value = opData.remove() ? null : opData.value();
+                ops.add(new ClientQuotaAlteration.Op(opData.key(), value));
+            }
+
+            entries.add(new ClientQuotaAlteration(new ClientQuotaEntity(entity), ops));
+        }
+        return entries;
+    }
+
+    public boolean validateOnly() {
+        return data.validateOnly();
+    }
+
+    @Override
+    public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ArrayList<ClientQuotaEntity> entities = new ArrayList<>(data.entries().size());
+        for (EntryData entryData : data.entries()) {
+            Map<String, String> entity = new HashMap<>(entryData.entity().size());
+            for (EntityData entityData : entryData.entity()) {
+                entity.put(entityData.entityType(), entityData.entityName());
+            }
+            entities.add(new ClientQuotaEntity(entity));
+        }
+        return new AlterClientQuotasResponse(entities, throttleTimeMs, e);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
new file mode 100644
index 0000000..7e4e891
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterClientQuotasResponse extends AbstractResponse {
+
+    private final AlterClientQuotasResponseData data;
+
+    public AlterClientQuotasResponse(Map<ClientQuotaEntity, ApiError> result, int throttleTimeMs) {
+        List<EntryData> entries = new ArrayList<>(result.size());
+        for (Map.Entry<ClientQuotaEntity, ApiError> entry : result.entrySet()) {
+            ApiError e = entry.getValue();
+            entries.add(new EntryData()
+                    .setErrorCode(e.error().code())
+                    .setErrorMessage(e.message())
+                    .setEntity(toEntityData(entry.getKey())));
+        }
+
+        this.data = new AlterClientQuotasResponseData()
+            .setThrottleTimeMs(throttleTimeMs)
+            .setEntries(entries);
+    }
+
+    public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) {
+        short errorCode = Errors.forException(e).code();
+        String errorMessage = e.getMessage();
+
+        List<EntryData> entries = new ArrayList<>(entities.size());
+        for (ClientQuotaEntity entity : entities) {
+            entries.add(new EntryData()
+                    .setErrorCode(errorCode)
+                    .setErrorMessage(errorMessage)
+                    .setEntity(toEntityData(entity)));
+        }
+
+        this.data = new AlterClientQuotasResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setEntries(entries);
+    }
+
+    public AlterClientQuotasResponse(Struct struct, short version) {
+        this.data = new AlterClientQuotasResponseData(struct, version);
+    }
+
+    public void complete(Map<ClientQuotaEntity, KafkaFutureImpl<Void>> futures) {
+        for (EntryData entryData : data.entries()) {
+            Map<String, String> entityEntries = new HashMap<>(entryData.entity().size());
+            for (EntityData entityData : entryData.entity()) {
+                entityEntries.put(entityData.entityType(), entityData.entityName());
+            }
+            ClientQuotaEntity entity = new ClientQuotaEntity(entityEntries);
+
+            KafkaFutureImpl<Void> future = futures.get(entity);
+            if (future == null) {
+                throw new IllegalArgumentException("Future map must contain entity " + entity);
+            }
+
+            Errors error = Errors.forCode(entryData.errorCode());
+            if (error == Errors.NONE) {
+                future.complete(null);
+            } else {
+                future.completeExceptionally(error.exception(entryData.errorMessage()));
+            }
+        }
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> counts = new HashMap<>();
+        for (EntryData entry : data.entries()) {
+            Errors error = Errors.forCode(entry.errorCode());
+            counts.put(error, counts.getOrDefault(error, 0) + 1);
+        }
+        return counts;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+
+    private static List<EntityData> toEntityData(ClientQuotaEntity entity) {
+        List<AlterClientQuotasResponseData.EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new AlterClientQuotasResponseData.EntityData()
+                    .setEntityType(entry.getKey())
+                    .setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java
new file mode 100644
index 0000000..a5496ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData.ComponentData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collection;
+
+public class DescribeClientQuotasRequest extends AbstractRequest {
+    // These values must not change.
+    private static final byte MATCH_TYPE_EXACT = 0;
+    private static final byte MATCH_TYPE_DEFAULT = 1;
+    private static final byte MATCH_TYPE_SPECIFIED = 2;
+
+    public static class Builder extends AbstractRequest.Builder<DescribeClientQuotasRequest> {
+
+        private final DescribeClientQuotasRequestData data;
+
+        public Builder(ClientQuotaFilter filter) {
+            super(ApiKeys.DESCRIBE_CLIENT_QUOTAS);
+
+            List<ComponentData> componentData = new ArrayList<>(filter.components().size());
+            for (ClientQuotaFilterComponent component : filter.components()) {
+                ComponentData fd = new ComponentData().setEntityType(component.entityType());
+                if (component.match() == null) {
+                    fd.setMatchType(MATCH_TYPE_SPECIFIED);
+                    fd.setMatch(null);
+                } else if (component.match().isPresent()) {
+                    fd.setMatchType(MATCH_TYPE_EXACT);
+                    fd.setMatch(component.match().get());
+                } else {
+                    fd.setMatchType(MATCH_TYPE_DEFAULT);
+                    fd.setMatch(null);
+                }
+                componentData.add(fd);
+            }
+            this.data = new DescribeClientQuotasRequestData()
+                .setComponents(componentData)
+                .setStrict(filter.strict());
+        }
+
+        @Override
+        public DescribeClientQuotasRequest build(short version) {
+            return new DescribeClientQuotasRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final DescribeClientQuotasRequestData data;
+
+    public DescribeClientQuotasRequest(DescribeClientQuotasRequestData data, short version) {
+        super(ApiKeys.DESCRIBE_CLIENT_QUOTAS, version);
+        this.data = data;
+    }
+
+    public DescribeClientQuotasRequest(Struct struct, short version) {
+        super(ApiKeys.DESCRIBE_CLIENT_QUOTAS, version);
+        this.data = new DescribeClientQuotasRequestData(struct, version);
+    }
+
+    public ClientQuotaFilter filter() {
+        List<ClientQuotaFilterComponent> components = new ArrayList<>(data.components().size());
+        for (ComponentData componentData : data.components()) {
+            ClientQuotaFilterComponent component;
+            switch (componentData.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    component = ClientQuotaFilterComponent.ofEntity(componentData.entityType(), componentData.match());
+                    break;
+                case MATCH_TYPE_DEFAULT:
+                    component = ClientQuotaFilterComponent.ofDefaultEntity(componentData.entityType());
+                    break;
+                case MATCH_TYPE_SPECIFIED:
+                    component = ClientQuotaFilterComponent.ofEntityType(componentData.entityType());
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unexpected match type: " + componentData.matchType());
+            }
+            components.add(component);
+        }
+        if (data.strict()) {
+            return ClientQuotaFilter.containsOnly(components);
+        } else {
+            return ClientQuotaFilter.contains(components);
+        }
+    }
+
+    @Override
+    public DescribeClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new DescribeClientQuotasResponse(throttleTimeMs, e);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
new file mode 100644
index 0000000..6ed5b1b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeClientQuotasResponse extends AbstractResponse {
+
+    private final DescribeClientQuotasResponseData data;
+
+    public DescribeClientQuotasResponse(Map<ClientQuotaEntity, Map<String, Double>> entities, int throttleTimeMs) {
+        List<EntryData> entries = new ArrayList<>(entities.size());
+        for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : entities.entrySet()) {
+            ClientQuotaEntity quotaEntity = entry.getKey();
+            List<EntityData> entityData = new ArrayList<>(quotaEntity.entries().size());
+            for (Map.Entry<String, String> entityEntry : quotaEntity.entries().entrySet()) {
+                entityData.add(new EntityData()
+                        .setEntityType(entityEntry.getKey())
+                        .setEntityName(entityEntry.getValue()));
+            }
+
+            Map<String, Double> quotaValues = entry.getValue();
+            List<ValueData> valueData = new ArrayList<>(quotaValues.size());
+            for (Map.Entry<String, Double> valuesEntry : entry.getValue().entrySet()) {
+                valueData.add(new ValueData()
+                        .setKey(valuesEntry.getKey())
+                        .setValue(valuesEntry.getValue()));
+            }
+
+            entries.add(new EntryData()
+                    .setEntity(entityData)
+                    .setValues(valueData));
+        }
+
+        this.data = new DescribeClientQuotasResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode((short) 0)
+                .setErrorMessage(null)
+                .setEntries(entries);
+    }
+
+    public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+        this.data = new DescribeClientQuotasResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+                .setErrorMessage(e.getMessage())
+                .setEntries(null);
+    }
+
+    public DescribeClientQuotasResponse(Struct struct, short version) {
+        this.data = new DescribeClientQuotasResponseData(struct, version);
+    }
+
+    public void complete(KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> future) {
+        Errors error = Errors.forCode(data.errorCode());
+        if (error != Errors.NONE) {
+            future.completeExceptionally(error.exception(data.errorMessage()));
+            return;
+        }
+
+        Map<ClientQuotaEntity, Map<String, Double>> result = new HashMap<>(data.entries().size());
+        for (EntryData entries : data.entries()) {
+            Map<String, String> entity = new HashMap<>(entries.entity().size());
+            for (EntityData entityData : entries.entity()) {
+                entity.put(entityData.entityType(), entityData.entityName());
+            }
+
+            Map<String, Double> values = new HashMap<>(entries.values().size());
+            for (ValueData valueData : entries.values()) {
+                values.put(valueData.key(), valueData.value());
+            }
+
+            result.put(new ClientQuotaEntity(entity), values);
+        }
+        future.complete(result);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+}
diff --git a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
new file mode 100644
index 0000000..7e74d44
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 49,
+  "type": "request",
+  "name": "AlterClientQuotasRequest",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "Entries", "type": "[]EntryData", "versions": "0+",
+      "about": "The quota configuration entries to alter.", "fields": [
+      { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+        "about": "The quota entity to alter.", "fields": [
+        { "name": "EntityType", "type": "string", "versions": "0+",
+          "about": "The entity type." },
+        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+          "about": "The name of the entity, or null if the default." }
+      ]},
+      { "name": "Ops", "type": "[]OpData", "versions": "0+",
+        "about": "An individual quota configuration entry to alter.", "fields": [
+        { "name": "Key", "type": "string", "versions": "0+",
+          "about": "The quota configuration key." },
+        { "name": "Value", "type": "float64", "versions": "0+",
+          "about": "The value to set, otherwise ignored if the value is to be removed." },
+        { "name": "Remove", "type": "bool", "versions": "0+",
+          "about": "Whether the quota configuration value should be removed, otherwise set." }
+      ]}
+    ]},
+    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
+      "about": "Whether the alteration should be validated, but not performed." }
+  ]
+}
diff --git a/clients/src/main/resources/common/message/AlterClientQuotasResponse.json b/clients/src/main/resources/common/message/AlterClientQuotasResponse.json
new file mode 100644
index 0000000..2f8fb88
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterClientQuotasResponse.json
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 49,
+  "type": "response",
+  "name": "AlterClientQuotasResponse",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "Entries", "type": "[]EntryData", "versions": "0+",
+      "about": "The quota configuration entries to alter.", "fields": [
+      { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        "about": "The error code, or `0` if the quota alteration succeeded." },
+      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+        "about": "The error message, or `null` if the quota alteration succeeded." },
+      { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+        "about": "The quota entity to alter.", "fields": [
+        { "name": "EntityType", "type": "string", "versions": "0+",
+          "about": "The entity type." },
+        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+          "about": "The name of the entity, or null if the default." }
+      ]}
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
new file mode 100644
index 0000000..7abfd3c
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 48,
+  "type": "request",
+  "name": "DescribeClientQuotasRequest",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "Components", "type": "[]ComponentData", "versions": "0+",
+      "about": "Filter components to apply to quota entities.", "fields": [
+      { "name": "EntityType", "type": "string", "versions": "0+",
+        "about": "The entity type that the filter component applies to." },
+      { "name": "MatchType", "type": "int8", "versions": "0+",
+        "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
+      { "name": "Match", "type": "string", "versions": "0+", "nullableVersions": "0+",
+        "about": "The string to match against, or null if unused for the match type." }
+    ]},
+    { "name": "Strict", "type": "bool", "versions": "0+",
+      "about": "Whether the match is strict, i.e. should exclude entities with unspecified entity types." }
+  ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json b/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json
new file mode 100644
index 0000000..5f5c784
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 48,
+  "type": "response",
+  "name": "DescribeClientQuotasResponse",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The error code, or `0` if the quota description succeeded." },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+      "about": "The error message, or `null` if the quota description succeeded." },
+    { "name": "Entries", "type": "[]EntryData", "versions": "0+", "nullableVersions": "0+",
+      "about": "A result entry.", "fields": [
+      { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+        "about": "The quota entity description.", "fields": [
+        { "name": "EntityType", "type": "string", "versions": "0+",
+          "about": "The entity type." },
+        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+          "about": "The entity name, or null if the default." }
+      ]},
+      { "name": "Values", "type": "[]ValueData", "versions": "0+",
+	"about": "The quota values for the entity.", "fields": [
+        { "name": "Key", "type": "string", "versions": "0+",
+          "about": "The quota configuration key." },
+        { "name": "Value", "type": "float64", "versions": "0+",
+          "about": "The quota configuration value." }
+      ]}
+    ]}
+  ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 72c5fa9..b6f8417 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -89,6 +89,11 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -101,6 +106,7 @@ import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.ElectLeadersResponse;
@@ -638,6 +644,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0),
                     TimeUnit.DAYS.toMillis(1));
             callAdminClientApisAndExpectAnAuthenticationError(env);
+            callClientQuotasApisAndExpectAnAuthenticationError(env);
         }
     }
 
@@ -696,6 +703,26 @@ public class KafkaAdminClientTest {
         }
     }
 
+    private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+        try {
+            env.adminClient().describeClientQuotas(ClientQuotaFilter.all()).entities().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+            ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, asList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)));
+            env.adminClient().alterClientQuotas(asList(alteration)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+                e.getCause() instanceof AuthenticationException);
+        }
+    }
+
     private static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
     private static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL),
@@ -2810,6 +2837,74 @@ public class KafkaAdminClientTest {
         }
     }
 
+    private ClientQuotaEntity newClientQuotaEntity(String... args) {
+        assertTrue(args.length % 2 == 0);
+
+        Map<String, String> entityMap = new HashMap<>(args.length / 2);
+        for (int index = 0; index < args.length; index += 2) {
+            entityMap.put(args[index], args[index + 1]);
+        }
+        return new ClientQuotaEntity(entityMap);
+    }
+
+    @Test
+    public void testDescribeClientQuotas() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            final String value = "value";
+
+            Map<ClientQuotaEntity, Map<String, Double>> responseData = new HashMap<>();
+            ClientQuotaEntity entity1 = newClientQuotaEntity(ClientQuotaEntity.USER, "user-1", ClientQuotaEntity.CLIENT_ID, value);
+            ClientQuotaEntity entity2 = newClientQuotaEntity(ClientQuotaEntity.USER, "user-2", ClientQuotaEntity.CLIENT_ID, value);
+            responseData.put(entity1, Collections.singletonMap("consumer_byte_rate", 10000.0));
+            responseData.put(entity2, Collections.singletonMap("producer_byte_rate", 20000.0));
+
+            env.kafkaClient().prepareResponse(new DescribeClientQuotasResponse(responseData, 0));
+
+            ClientQuotaFilter filter = ClientQuotaFilter.contains(asList(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, value)));
+
+            DescribeClientQuotasResult result = env.adminClient().describeClientQuotas(filter);
+            Map<ClientQuotaEntity, Map<String, Double>> resultData = result.entities().get();
+            assertEquals(resultData.size(), 2);
+            assertTrue(resultData.containsKey(entity1));
+            Map<String, Double> config1 = resultData.get(entity1);
+            assertEquals(config1.size(), 1);
+            assertEquals(config1.get("consumer_byte_rate"), 10000.0, 1e-6);
+            assertTrue(resultData.containsKey(entity2));
+            Map<String, Double> config2 = resultData.get(entity2);
+            assertEquals(config2.size(), 1);
+            assertEquals(config2.get("producer_byte_rate"), 20000.0, 1e-6);
+        }
+    }
+
+    public void testAlterClientQuotas() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            ClientQuotaEntity goodEntity = newClientQuotaEntity(ClientQuotaEntity.USER, "user-1");
+            ClientQuotaEntity unauthorizedEntity = newClientQuotaEntity(ClientQuotaEntity.USER, "user-0");
+            ClientQuotaEntity invalidEntity = newClientQuotaEntity("", "user-0");
+
+            Map<ClientQuotaEntity, ApiError> responseData = new HashMap<>(2);
+            responseData.put(goodEntity, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Authorization failed"));
+            responseData.put(unauthorizedEntity, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Authorization failed"));
+            responseData.put(invalidEntity, new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity"));
+
+            env.kafkaClient().prepareResponse(new AlterClientQuotasResponse(responseData, 0));
+
+            List<ClientQuotaAlteration> entries = new ArrayList<>(3);
+            entries.add(new ClientQuotaAlteration(goodEntity, Collections.singleton(new ClientQuotaAlteration.Op("consumer_byte_rate", 10000.0))));
+            entries.add(new ClientQuotaAlteration(unauthorizedEntity, Collections.singleton(new ClientQuotaAlteration.Op("producer_byte_rate", 10000.0))));
+            entries.add(new ClientQuotaAlteration(invalidEntity, Collections.singleton(new ClientQuotaAlteration.Op("producer_byte_rate", 100.0))));
+
+            AlterClientQuotasResult result = env.adminClient().alterClientQuotas(entries);
+            result.values().get(goodEntity);
+            TestUtils.assertFutureError(result.values().get(unauthorizedEntity), ClusterAuthorizationException.class);
+            TestUtils.assertFutureError(result.values().get(invalidEntity), InvalidRequestException.class);
+        }
+    }
+
     private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
                                                                  MemberAssignment assignment) {
         return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7cfd051..c6d9d14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -459,6 +461,16 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public void close(Duration timeout) {}
 
 
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 562a91a..de2b593 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,11 +28,12 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -61,7 +62,8 @@ import scala.collection._
 object ConfigCommand extends Config {
 
   val BrokerLoggerConfigType = "broker-loggers"
-  val BrokerSupportedConfigTypes = Seq(ConfigType.Topic, ConfigType.Broker, BrokerLoggerConfigType)
+  val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
+  val ZkSupportedConfigTypes = ConfigType.all
   val DefaultScramIterations = 4096
   // Dynamic broker configs can only be updated using the new AdminClient once brokers have started
   // so that configs may be fully validated. Prior to starting brokers, updates may be performed using
@@ -283,14 +285,8 @@ object ConfigCommand extends Config {
     props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
     val adminClient = Admin.create(props)
 
-    if (opts.entityTypes.size != 1)
-      throw new IllegalArgumentException(s"Exactly one entity type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server")
-
-    val entityNames = opts.entityNames
-    if (entityNames.size > 1)
-      throw new IllegalArgumentException(s"At most one entity name must be specified with --bootstrap-server")
-    else if (opts.options.has(opts.alterOpt) && entityNames.size != 1)
-      throw new IllegalArgumentException(s"Exactly one entity name must be specified with --bootstrap-server for --alter")
+    if (opts.options.has(opts.alterOpt) && opts.entityTypes.size != opts.entityNames.size)
+      throw new IllegalArgumentException(s"An entity name must be specified for every entity type")
 
     try {
       if (opts.options.has(opts.alterOpt))
@@ -303,14 +299,17 @@ object ConfigCommand extends Config {
   }
 
   private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
-    val entityType = opts.entityTypes.head
-    val entityName = opts.entityNames.head
-    val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
+    val entityTypes = opts.entityTypes
+    val entityNames = opts.entityNames
+    val entityTypeHead = entityTypes.head
+    val entityNameHead = entityNames.head
+    val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala
+    val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
-    entityType match {
+    entityTypeHead match {
       case ConfigType.Topic =>
-        val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
+        val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
           .map { entry => (entry.name, entry) }.toMap
 
         // fail the command if any of the configs to be deleted does not exist
@@ -318,7 +317,7 @@ object ConfigCommand extends Config {
         if (invalidConfigs.nonEmpty)
           throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
 
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityName)
+        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead)
         val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
           ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
@@ -326,7 +325,7 @@ object ConfigCommand extends Config {
         adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
       case ConfigType.Broker =>
-        val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
+        val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
           .map { entry => (entry.name, entry) }.toMap
 
         // fail the command if any of the configs to be deleted does not exist
@@ -340,38 +339,75 @@ object ConfigCommand extends Config {
           throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
         val newConfig = new JConfig(newEntries.asJava.values)
 
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+        val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead)
         val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
       case BrokerLoggerConfigType =>
-        val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true, describeAll = false).map(_.name)
+        val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
         // fail the command if any of the configured broker loggers do not exist
         val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
         if (invalidBrokerLoggers.nonEmpty)
           throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
 
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+        val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
         val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
           ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
         ).asJavaCollection
         adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
-      case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityType")
+      case ConfigType.User =>
+      case ConfigType.Client =>
+        val oldConfig: Map[String, java.lang.Double] = getClientQuotasConfig(adminClient, entityTypes, entityNames)
+
+        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+        if (invalidConfigs.nonEmpty)
+          throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+        val entity = new ClientQuotaEntity(opts.entityTypes.map { entType =>
+          entType match {
+            case ConfigType.User => ClientQuotaEntity.USER
+            case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
+            case _ => throw new IllegalArgumentException(s"Unexpected entity type: ${entType}")
+          }
+        }.zip(opts.entityNames).toMap.asJava)
+
+        val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
+        val alterOps = (configsToBeAddedMap.map { case (key, value) =>
+          val doubleValue = try value.toDouble catch {
+            case _: NumberFormatException =>
+              throw new IllegalArgumentException(s"Cannot parse quota configuration value for ${key}: ${value}")
+          }
+          new ClientQuotaAlteration.Op(key, doubleValue)
+        } ++ configsToBeDeleted.map(key => new ClientQuotaAlteration.Op(key, null))).asJavaCollection
+
+        adminClient.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(entity, alterOps)), alterOptions)
+          .all().get(60, TimeUnit.SECONDS)
+
+      case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
     }
 
-    if (entityName.nonEmpty)
-      println(s"Completed updating config for ${entityType.dropRight(1)} $entityName.")
+    if (entityNameHead.nonEmpty)
+      println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
     else
-      println(s"Completed updating default config for $entityType in the cluster.")
+      println(s"Completed updating default config for $entityTypeHead in the cluster.")
   }
 
   private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
-    val entityType = opts.entityTypes.head
-    val entityName = opts.entityNames.headOption
+    val entityTypes = opts.entityTypes
+    val entityNames = opts.entityNames
     val describeAll = opts.options.has(opts.allOpt)
 
+    entityTypes.head match {
+      case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType =>
+        describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
+      case ConfigType.User | ConfigType.Client =>
+        describeClientQuotasConfig(adminClient, entityTypes, entityNames)
+    }
+  }
+
+  private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = {
     val entities = entityName
       .map(name => List(name))
       .getOrElse(entityType match {
@@ -389,14 +425,14 @@ object ConfigCommand extends Config {
           val configSourceStr = if (describeAll) "All" else "Dynamic"
           println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:")
       }
-      getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
+      getResourceConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
         val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
         println(s"  ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}")
       }
     }
   }
 
-  private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
+  private def getResourceConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
     def validateBrokerId(): Unit = try entityName.toInt catch {
       case _: NumberFormatException =>
         throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
@@ -436,6 +472,36 @@ object ConfigCommand extends Config {
       }).toSeq
   }
 
+  private def describeClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
+    getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).foreach { case (entity, entries) =>
+      val entityEntries = entity.entries.asScala
+      val entityStr = (entityEntries.get(ClientQuotaEntity.USER).map(u => s"user-principal '${u}'") ++
+        entityEntries.get(ClientQuotaEntity.CLIENT_ID).map(c => s"client-id '${c}'")).mkString(", ")
+      val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ")
+      println(s"Configs for ${entityStr} are ${entriesStr}")
+    }
+  }
+
+  private def getClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Map[String, java.lang.Double] = {
+    if (entityTypes.size != entityNames.size)
+      throw new IllegalArgumentException("Exactly one entity name must be specified for every entity type")
+    getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).headOption.map(_._2.asScala).getOrElse(Map.empty)
+  }
+
+  private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
+    val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
+      val entityType = entityTypeOpt match {
+        case Some(ConfigType.User) => ClientQuotaEntity.USER
+        case Some(ConfigType.Client) => ClientQuotaEntity.CLIENT_ID
+        case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
+        case None => throw new IllegalArgumentException("More entity names specified than entity types")
+      }
+      entityNameOpt.map(ClientQuotaFilterComponent.ofEntity(entityType, _)).getOrElse(ClientQuotaFilterComponent.ofEntityType(entityType))
+    }
+
+    adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
+  }
+
   case class Entity(entityType: String, sanitizedName: Option[String]) {
     val entityPath = sanitizedName match {
       case Some(n) => entityType + "/" + n
@@ -506,7 +572,7 @@ object ConfigCommand extends Config {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
     if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client)
-      parseQuotaEntity(opts, entityTypes, entityNames)
+      parseClientQuotaEntity(opts, entityTypes, entityNames)
     else {
       // Exactly one entity type and at-most one entity name expected for other entities
       val name = entityNames.headOption match {
@@ -517,7 +583,7 @@ object ConfigCommand extends Config {
     }
   }
 
-  private def parseQuotaEntity(opts: ConfigCommandOptions, types: List[String], names: List[String]): ConfigEntity = {
+  private def parseClientQuotaEntity(opts: ConfigCommandOptions, types: List[String], names: List[String]): ConfigEntity = {
     if (opts.options.has(opts.alterOpt) && names.size != types.size)
       throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
 
@@ -651,7 +717,7 @@ object ConfigCommand extends Config {
       val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
         (BrokerSupportedConfigTypes, "--bootstrap-server")
       else
-        (ConfigType.all, "--zookeeper")
+        (ZkSupportedConfigTypes, "--zookeeper")
 
       entityTypeVals.foreach(entityTypeVal =>
         if (!allowedEntityTypes.contains(entityTypeVal))
@@ -687,9 +753,6 @@ object ConfigCommand extends Config {
         }
       }
 
-      if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.User))
-        CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-
       if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
         throw new IllegalArgumentException(s"an entity name must be specified with --describe of ${entityTypeVals.mkString(",")}")
 
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 01c6d41..ea92bfd 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -29,19 +29,21 @@ import org.apache.kafka.clients.admin.AlterConfigOp
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
-import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicConfigs, CreatableTopicResult}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
+import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse}
-import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
-import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.{Map, mutable, _}
 import scala.collection.JavaConverters._
@@ -711,4 +713,195 @@ class AdminManager(val config: KafkaConfig,
     val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
     new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
   }
+
+  private def sanitizeEntityName(entityName: String): String = {
+    if (entityName == ConfigEntityName.Default)
+      throw new InvalidRequestException(s"Entity name '${ConfigEntityName.Default}' is reserved")
+    Sanitizer.sanitize(Option(entityName).getOrElse(ConfigEntityName.Default))
+  }
+
+  private def desanitizeEntityName(sanitizedEntityName: String): String =
+    Sanitizer.desanitize(sanitizedEntityName) match {
+      case ConfigEntityName.Default => null
+      case name => name
+    }
+
+  private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): (Option[String], Option[String]) = {
+    if (entity.entries.isEmpty)
+      throw new InvalidRequestException("Invalid empty client quota entity")
+
+    var user: Option[String] = None
+    var clientId: Option[String] = None
+    entity.entries.asScala.foreach { case (entityType, entityName) =>
+      val sanitizedEntityName = Some(sanitizeEntityName(entityName))
+      entityType match {
+        case ClientQuotaEntity.USER => user = sanitizedEntityName
+        case ClientQuotaEntity.CLIENT_ID => clientId = sanitizedEntityName
+        case _ => throw new InvalidRequestException(s"Unhandled client quota entity type: ${entityType}")
+      }
+      if (entityName != null && entityName.isEmpty)
+        throw new InvalidRequestException(s"Empty ${entityType} not supported")
+    }
+    (user, clientId)
+  }
+
+  private def userClientIdToEntity(user: Option[String], clientId: Option[String]): ClientQuotaEntity = {
+    new ClientQuotaEntity((user.map(u => ClientQuotaEntity.USER -> u) ++ clientId.map(c => ClientQuotaEntity.CLIENT_ID -> c)).toMap.asJava)
+  }
+
+  def describeClientQuotas(filter: ClientQuotaFilter): Map[ClientQuotaEntity, Map[String, Double]] = {
+    var userComponent: Option[ClientQuotaFilterComponent] = None
+    var clientIdComponent: Option[ClientQuotaFilterComponent] = None
+    filter.components.asScala.foreach { component =>
+      component.entityType match {
+        case ClientQuotaEntity.USER =>
+          if (userComponent.isDefined)
+            throw new InvalidRequestException(s"Duplicate user filter component entity type");
+          userComponent = Some(component)
+        case ClientQuotaEntity.CLIENT_ID =>
+          if (clientIdComponent.isDefined)
+            throw new InvalidRequestException(s"Duplicate client filter component entity type");
+          clientIdComponent = Some(component)
+        case "" =>
+          throw new InvalidRequestException(s"Unexpected empty filter component entity type")
+        case et =>
+          // Supplying other entity types is not yet supported.
+          throw new UnsupportedVersionException(s"Custom entity type '${et}' not supported")
+      }
+    }
+    handleDescribeClientQuotas(userComponent, clientIdComponent, filter.strict)
+  }
+
+  def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
+    clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean) = {
+
+    def toOption(opt: java.util.Optional[String]): Option[String] =
+      if (opt == null)
+        None
+      else if (opt.isPresent)
+        Some(opt.get)
+      else
+        Some(null)
+
+    val user = userComponent.flatMap(c => toOption(c.`match`))
+    val clientId = clientIdComponent.flatMap(c => toOption(c.`match`))
+
+    def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("")
+    val sanitizedUser = sanitized(user)
+    val sanitizedClientId = sanitized(clientId)
+
+    def wantExact(component: Option[ClientQuotaFilterComponent]): Boolean = component.exists(_.`match` != null)
+    val exactUser = wantExact(userComponent)
+    val exactClientId = wantExact(clientIdComponent)
+
+    def wantExcluded(component: Option[ClientQuotaFilterComponent]): Boolean = strict && !component.isDefined
+    val excludeUser = wantExcluded(userComponent)
+    val excludeClientId = wantExcluded(clientIdComponent)
+
+    val userEntries = if (exactUser && excludeClientId)
+      Map(((Some(user.get), None) -> adminZkClient.fetchEntityConfig(ConfigType.User, sanitizedUser)))
+    else if (!excludeUser && !exactClientId)
+      adminZkClient.fetchAllEntityConfigs(ConfigType.User).map { case (name, props) =>
+        ((Some(desanitizeEntityName(name)), None) -> props)
+      }
+    else
+      Map.empty
+
+    val clientIdEntries = if (excludeUser && exactClientId)
+      Map(((None, Some(clientId.get)) -> adminZkClient.fetchEntityConfig(ConfigType.Client, sanitizedClientId)))
+    else if (!exactUser && !excludeClientId)
+      adminZkClient.fetchAllEntityConfigs(ConfigType.Client).map { case (name, props) =>
+        ((None, Some(desanitizeEntityName(name))) -> props)
+      }
+    else
+      Map.empty
+
+    val bothEntries = if (exactUser && exactClientId)
+      Map(((Some(user.get), Some(clientId.get)) ->
+        adminZkClient.fetchEntityConfig(ConfigType.User, s"${sanitizedUser}/clients/${sanitizedClientId}")))
+    else if (!excludeUser && !excludeClientId)
+      adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).map { case (name, props) =>
+        val components = name.split("/")
+        if (components.size != 3 || components(1) != "clients")
+          throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+        ((Some(desanitizeEntityName(components(0))), Some(desanitizeEntityName(components(2)))) -> props)
+      }
+    else
+      Map.empty
+
+    def matches(nameComponent: Option[ClientQuotaFilterComponent], name: Option[String]): Boolean = nameComponent match {
+      case Some(component) =>
+        toOption(component.`match`) match {
+          case Some(n) => name.exists(_ == n)
+          case None => name.isDefined
+        }
+      case None =>
+        !name.isDefined || !strict
+    }
+
+    def fromProps(props: Properties): Map[String, Double] = {
+      props.asScala.map { case (key, value) =>
+        val doubleValue = try value.toDouble catch {
+          case _: NumberFormatException =>
+            throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}")
+        }
+        (key -> doubleValue)
+      }
+    }
+
+    (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
+      if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
+        Some((userClientIdToEntity(u, c) -> fromProps(p)))
+      else
+        None
+    }.flatten.toMap
+  }
+
+  def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: Boolean): Map[ClientQuotaEntity, ApiError] = {
+    def alterEntityQuotas(entity: ClientQuotaEntity, ops: Iterable[ClientQuotaAlteration.Op]): Unit = {
+      val (path, configType, configKeys) = entityToSanitizedUserClientId(entity) match {
+        case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, ConfigType.User, DynamicConfig.User.configKeys)
+        case (Some(user), None) => (user, ConfigType.User, DynamicConfig.User.configKeys)
+        case (None, Some(clientId)) => (clientId, ConfigType.Client, DynamicConfig.Client.configKeys)
+        case _ => throw new InvalidRequestException("Invalid empty client quota entity")
+      }
+
+      val props = adminZkClient.fetchEntityConfig(configType, path)
+      ops.foreach { op =>
+        op.value match {
+          case null =>
+            props.remove(op.key)
+          case value => configKeys.get(op.key) match {
+            case null =>
+              throw new InvalidRequestException(s"Invalid configuration key ${op.key}")
+            case key => key.`type` match {
+              case ConfigDef.Type.DOUBLE =>
+                props.setProperty(op.key, value.toString)
+              case ConfigDef.Type.LONG =>
+                val epsilon = 1e-6
+                val longValue = (value + epsilon).toLong
+                if ((longValue.toDouble - value).abs > epsilon)
+                  throw new InvalidRequestException(s"Configuration ${op.key} must be a Long value")
+                props.setProperty(op.key, longValue.toString)
+              case _ =>
+                throw new IllegalStateException(s"Unexpected config type ${key.`type`}")
+            }
+          }
+        }
+      }
+      if (!validateOnly)
+        adminZkClient.changeConfigs(configType, path, props)
+    }
+    entries.map { entry =>
+      val apiError = try {
+        alterEntityQuotas(entry.entity, entry.ops.asScala)
+        ApiError.NONE
+      } catch {
+        case e: Throwable =>
+          info(s"Error encountered while updating client quotas", e)
+          ApiError.fromThrowable(e)
+      }
+      (entry.entity -> apiError)
+    }.toMap
+  }
 }
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e5974d3..abea5de 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -89,6 +89,8 @@ object DynamicConfig {
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
 
+    def configKeys = clientConfigs.configKeys
+
     def names = clientConfigs.names
 
     def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
@@ -102,6 +104,8 @@ object DynamicConfig {
       .define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
       .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
 
+    def configKeys = userConfigs.configKeys
+
     def names = userConfigs.names
 
     def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d092cdd..4f12473 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -174,6 +174,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
         case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
         case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
+        case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
+        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -2813,6 +2815,34 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = {
+    val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
+
+    if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+      val result = adminManager.describeClientQuotas(
+        describeClientQuotasRequest.filter).mapValues(_.mapValues(Double.box).asJava).asJava
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new DescribeClientQuotasResponse(result, requestThrottleMs))
+    } else {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+    }
+  }
+
+  def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
+    val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
+
+    if (authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+      val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries().asScala.toSeq,
+        alterClientQuotasRequest.validateOnly()).asJava
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterClientQuotasResponse(result, requestThrottleMs))
+    } else {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+    }
+  }
+
   private def authorize(request: RequestChannel.Request,
                         operation: AclOperation,
                         resourceType: ResourceType,
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index e938a6d..5a76f75 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
@@ -92,8 +93,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldParseArgumentsForClientsEntityType(): Unit = {
+    testArgumentParse("clients", zkConfig = false)
+  }
+
+  @Test
   def shouldParseArgumentsForUsersEntityTypeUsingZookeeper(): Unit = {
-    testArgumentParse("clients", zkConfig = true)
+    testArgumentParse("users", zkConfig = true)
+  }
+
+  @Test
+  def shouldParseArgumentsForUsersEntityType(): Unit = {
+    testArgumentParse("users", zkConfig = false)
   }
 
   @Test
@@ -215,10 +226,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     assertTrue(addedProps2.getProperty("f").isEmpty)
   }
 
-  @Test
-  def testOptionEntityTypeNames(): Unit = {
+  def doTestOptionEntityTypeNames(zkConfig: Boolean): Unit = {
+    val connectOpts = if (zkConfig)
+      ("--zookeeper", zkConnect)
+    else
+      ("--bootstrap-server", "localhost:9092")
+
     def testExpectedEntityTypeNames(expectedTypes: List[String], expectedNames: List[String], args: String*): Unit = {
-      val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, "--describe") ++ args)
+      val createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, "--describe") ++ args)
       createOpts.checkArgs()
       assertEquals(createOpts.entityTypes, expectedTypes)
       assertEquals(createOpts.entityNames, expectedNames)
@@ -243,6 +258,16 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     testExpectedEntityTypeNames(List(ConfigType.Broker), List.empty, "--entity-type", "brokers")
   }
 
+  @Test
+  def testOptionEntityTypeNamesUsingZookeeper(): Unit = {
+    doTestOptionEntityTypeNames(zkConfig = true)
+  }
+
+  @Test
+  def testOptionEntityTypeNames(): Unit = {
+    doTestOptionEntityTypeNames(zkConfig = false)
+  }
+
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -292,6 +317,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     createOpts.checkArgs()
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailIfMixedEntityTypeFlags(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"))
+    createOpts.checkArgs()
+  }
+
   @Test
   def shouldAddClientConfigUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -312,6 +344,66 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldAddClientConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-name", "my-client-id",
+      "--entity-type", "clients",
+      "--alter",
+      "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000",
+      "--delete-config", "request_percentage"))
+
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "my-client-id")).asJava)
+
+    var describedConfigs = false
+    val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]]
+    describeFuture.complete(Map((entity -> Map(("request_percentage" -> Double.box(50.0))).asJava)).asJava)
+    val describeResult: DescribeClientQuotasResult = EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+    EasyMock.expect(describeResult.entities()).andReturn(describeFuture)
+
+    var alteredConfigs = false
+    val alterFuture = new KafkaFutureImpl[Void]
+    alterFuture.complete(null)
+    val alterResult: AlterClientQuotasResult = EasyMock.createNiceMock(classOf[AlterClientQuotasResult])
+    EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = {
+        assertEquals(1, filter.components.size)
+        assertTrue(filter.strict)
+        val component = filter.components.asScala.head
+        assertEquals(ClientQuotaEntity.CLIENT_ID, component.entityType)
+        assertTrue(component.`match`.isPresent)
+        assertEquals("my-client-id", component.`match`.get)
+        describedConfigs = true
+        describeResult
+      }
+
+      override def alterClientQuotas(entries: util.Collection[ClientQuotaAlteration], options: AlterClientQuotasOptions): AlterClientQuotasResult = {
+        assertFalse(options.validateOnly)
+        assertEquals(1, entries.size)
+        val alteration = entries.asScala.head
+        assertEquals(entity, alteration.entity)
+        val ops = alteration.ops.asScala
+        assertEquals(3, ops.size)
+        val expectedOps = Set(
+          new ClientQuotaAlteration.Op("consumer_byte_rate", Double.box(20000)),
+          new ClientQuotaAlteration.Op("producer_byte_rate", Double.box(10000)),
+          new ClientQuotaAlteration.Op("request_percentage", null)
+        )
+        assertEquals(expectedOps, ops.toSet)
+        alteredConfigs = true
+        alterResult
+      }
+    }
+    EasyMock.replay(alterResult, describeResult)
+    ConfigCommand.alterConfig(mockAdminClient, createOpts)
+    assertTrue(describedConfigs)
+    assertTrue(alteredConfigs)
+    EasyMock.reset(alterResult, describeResult)
+  }
+
+  @Test
   def shouldAddTopicConfigUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "my-topic",
@@ -935,12 +1027,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     ConfigCommand.alterConfigWithZk(null, del512, CredentialChange("userB", Set(), 4096))
   }
 
-  @Test
-  def testQuotaConfigEntity(): Unit = {
+  def doTestQuotaConfigEntity(zkConfig: Boolean): Unit = {
+    val connectOpts = if (zkConfig)
+      ("--zookeeper", zkConnect)
+    else
+      ("--bootstrap-server", "localhost:9092")
 
     def createOpts(entityType: String, entityName: Option[String], otherArgs: Array[String]) : ConfigCommandOptions = {
-      val optArray = Array("--zookeeper", zkConnect,
-                           "--entity-type", entityType)
+      val optArray = Array(connectOpts._1, connectOpts._2, "--entity-type", entityType)
       val nameArray = entityName match {
         case Some(name) => Array("--entity-name", name)
         case None => Array[String]()
@@ -1008,9 +1102,23 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def testUserClientQuotaOpts(): Unit = {
+  def testQuotaConfigEntityUsingZookeeper(): Unit = {
+    doTestQuotaConfigEntity(zkConfig = true)
+  }
+
+  @Test
+  def testQuotaConfigEntity(): Unit = {
+    doTestQuotaConfigEntity(zkConfig = false)
+  }
+
+  def doTestUserClientQuotaOpts(zkConfig: Boolean): Unit = {
+    val connectOpts = if (zkConfig)
+      ("--zookeeper", zkConnect)
+    else
+      ("--bootstrap-server", "localhost:9092")
+
     def checkEntity(expectedEntityType: String, expectedEntityName: String, args: String*): Unit = {
-      val opts = new ConfigCommandOptions(Array("--zookeeper", zkConnect) ++ args)
+      val opts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2) ++ args)
       opts.checkArgs()
       val entity = ConfigCommand.parseEntity(opts)
       assertEquals(expectedEntityType, entity.root.entityType)
@@ -1025,7 +1133,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         "--entity-type", "clients", "--entity-name", "<default>",
         "--alter", "--add-config", "a=b,c=d")
 
-
     checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
         "--entity-type", "users", "--entity-name", "CN=user1", "--entity-type", "clients", "--entity-name", "client1",
         "--alter", "--add-config", "a=b,c=d")
@@ -1050,6 +1157,16 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def testUserClientQuotaOptsUsingZookeeper(): Unit = {
+    doTestUserClientQuotaOpts(zkConfig = true)
+  }
+
+  @Test
+  def testUserClientQuotaOpts(): Unit = {
+    doTestUserClientQuotaOpts(zkConfig = false)
+  }
+
+  @Test
   def testQuotaDescribeEntities(): Unit = {
     val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
@@ -1134,5 +1251,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
     override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult =
       EasyMock.createNiceMock(classOf[AlterConfigsResult])
+    override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult =
+      EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+    override def alterClientQuotas(entries: util.Collection[ClientQuotaAlteration],
+      options: AlterClientQuotasOptions): AlterClientQuotasResult =
+      EasyMock.createNiceMock(classOf[AlterClientQuotasResult])
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
new file mode 100644
index 0000000..047ff71
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -0,0 +1,443 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+class ClientQuotasRequestTest extends BaseRequestTest {
+  private val ConsumerByteRateProp = DynamicConfig.Client.ConsumerByteRateOverrideProp
+  private val ProducerByteRateProp = DynamicConfig.Client.ProducerByteRateOverrideProp
+  private val RequestPercentageProp = DynamicConfig.Client.RequestPercentageOverrideProp
+
+  override val brokerCount = 1
+
+  @Test
+  def testAlterClientQuotasRequest(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
+
+    // Expect an empty configuration.
+    verifyDescribeEntityQuotas(entity, Map.empty)
+
+    // Add two configuration entries.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(10000.0)),
+      (ConsumerByteRateProp -> Some(20000.0))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 10000.0),
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+
+    // Update an existing entry.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(15000.0))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 15000.0),
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+
+    // Remove an existing configuration entry.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> None)
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+
+    // Remove a non-existent configuration entry.  This should make no changes.
+    alterEntityQuotas(entity, Map(
+      (RequestPercentageProp -> None)
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+
+    // Add back a deleted configuration entry.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(5000.0))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 5000.0),
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+
+    // Perform a mixed update.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(20000.0)),
+      (ConsumerByteRateProp -> None),
+      (RequestPercentageProp -> Some(12.3))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 12.3)
+    ))
+  }
+
+  @Test
+  def testAlterClientQuotasRequestValidateOnly(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+
+    // Set up a configuration.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(20000.0)),
+      (RequestPercentageProp -> Some(23.45))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 23.45)
+    ))
+
+    // Validate-only addition.
+    alterEntityQuotas(entity, Map(
+      (ConsumerByteRateProp -> Some(50000.0))
+    ), validateOnly = true)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 23.45)
+    ))
+
+    // Validate-only modification.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(10000.0))
+    ), validateOnly = true)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 23.45)
+    ))
+
+    // Validate-only removal.
+    alterEntityQuotas(entity, Map(
+      (RequestPercentageProp -> None)
+    ), validateOnly = true)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 23.45)
+    ))
+
+    // Validate-only mixed update.
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(10000.0)),
+      (ConsumerByteRateProp -> Some(50000.0)),
+      (RequestPercentageProp -> None)
+    ), validateOnly = true)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+      (RequestPercentageProp -> 23.45)
+    ))
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasBadUser(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
+    alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasBadClientId(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
+    alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasBadEntityType(): Unit = {
+    val entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
+    alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasEmptyEntity(): Unit = {
+    val entity = new ClientQuotaEntity(Map.empty.asJava)
+    alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true)
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasBadConfigKey(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+    alterEntityQuotas(entity, Map(("bad" -> Some(1.0))), validateOnly = true)
+  }
+
+  @Test(expected = classOf[InvalidRequestException])
+  def testAlterClientQuotasBadConfigValue(): Unit = {
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+    alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true)
+  }
+
+  // Entities to be matched against.
+  private val matchEntities = List(
+    (Some("user-1"), Some("client-id-1"), 50.50),
+    (Some("user-2"), Some("client-id-1"), 51.51),
+    (Some("user-3"), Some("client-id-2"), 52.52),
+    (Some(null), Some("client-id-1"), 53.53),
+    (Some("user-1"), Some(null), 54.54),
+    (Some("user-3"), Some(null), 55.55),
+    (Some("user-1"), None, 56.56),
+    (Some("user-2"), None, 57.57),
+    (Some("user-3"), None, 58.58),
+    (Some(null), None, 59.59),
+    (None, Some("client-id-2"), 60.60)
+  ).map { case (u, c, v) => (toEntity(u, c), v) }
+
+  private def setupDescribeClientQuotasMatchTest() = {
+    val result = alterClientQuotas(matchEntities.map { case (e, v) =>
+      (e -> Map((RequestPercentageProp, Some(v))))
+    }.toMap, validateOnly = false)
+    matchEntities.foreach(e => result.get(e._1).get.get(10, TimeUnit.SECONDS))
+
+    // Allow time for watch callbacks to be triggered.
+    Thread.sleep(500)
+  }
+
+  @Test
+  def testDescribeClientQuotasMatchExact(): Unit = {
+    setupDescribeClientQuotasMatchTest()
+
+    def matchEntity(entity: ClientQuotaEntity) = {
+      val components = entity.entries.asScala.map { case (entityType, entityName) =>
+        entityName match {
+          case null => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
+          case name => ClientQuotaFilterComponent.ofEntity(entityType, name)
+        }
+      }
+      describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
+    }
+
+    // Test exact matches.
+    matchEntities.foreach { case (e, v) =>
+      val result = matchEntity(e)
+      assertEquals(1, result.size)
+      assertTrue(result.get(e) != null)
+      val value = result.get(e).get(RequestPercentageProp)
+      assertTrue(value != null)
+      assertEquals(value, v, 1e-6)
+    }
+
+    // Entities not contained in `matchEntityList`.
+    val notMatchEntities = List(
+      (Some("user-1"), Some("client-id-2")),
+      (Some("user-3"), Some("client-id-1")),
+      (Some("user-2"), Some(null)),
+      (Some("user-4"), None),
+      (Some(null), Some("client-id-2")),
+      (None, Some("client-id-1")),
+      (None, Some("client-id-3")),
+    ).map { case (u, c) =>
+        new ClientQuotaEntity((u.map((ClientQuotaEntity.USER, _)) ++
+          c.map((ClientQuotaEntity.CLIENT_ID, _))).toMap.asJava)
+    }
+
+    // Verify exact matches of the non-matches returns empty.
+    notMatchEntities.foreach { e =>
+      val result = matchEntity(e)
+      assertEquals(0, result.size)
+    }
+  }
+
+  @Test
+  def testDescribeClientQuotasMatchPartial(): Unit = {
+    setupDescribeClientQuotasMatchTest()
+
+    def testMatchEntities(filter: ClientQuotaFilter, expectedMatchSize: Int, partition: ClientQuotaEntity => Boolean) {
+      val result = describeClientQuotas(filter)
+      val (expectedMatches, expectedNonMatches) = matchEntities.partition(e => partition(e._1))
+      assertEquals(expectedMatchSize, expectedMatches.size)  // for test verification
+      assertEquals(expectedMatchSize, result.size)
+      val expectedMatchesMap = expectedMatches.toMap
+      matchEntities.foreach { case (entity, expectedValue) =>
+        if (expectedMatchesMap.contains(entity)) {
+          val config = result.get(entity)
+          assertTrue(config != null)
+          val value = config.get(RequestPercentageProp)
+          assertTrue(value != null)
+          assertEquals(expectedValue, value, 1e-6)
+        } else {
+          assertTrue(result.get(entity) == null)
+        }
+      }
+    }
+
+    // Match open-ended existing user.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1")).asJava), 3,
+      entity => entity.entries.get(ClientQuotaEntity.USER) == "user-1"
+    )
+
+    // Match open-ended non-existent user.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown")).asJava), 0,
+      entity => false
+    )
+
+    // Match open-ended existing client ID.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2")).asJava), 2,
+      entity => entity.entries.get(ClientQuotaEntity.CLIENT_ID) == "client-id-2"
+    )
+
+    // Match open-ended default user.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)).asJava), 2,
+      entity => entity.entries.containsKey(ClientQuotaEntity.USER) && entity.entries.get(ClientQuotaEntity.USER) == null
+    )
+
+    // Match close-ended existing user.
+    testMatchEntities(
+      ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2")).asJava), 1,
+      entity => entity.entries.get(ClientQuotaEntity.USER) == "user-2" && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+    )
+
+    // Match close-ended existing client ID that has no matching entity.
+    testMatchEntities(
+      ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1")).asJava), 0,
+      entity => false
+    )
+
+    // Match against all entities with the user type in a close-ended match.
+    testMatchEntities(
+      ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 4,
+      entity => entity.entries.containsKey(ClientQuotaEntity.USER) && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+    )
+
+    // Match against all entities with the user type in an open-ended match.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 10,
+      entity => entity.entries.containsKey(ClientQuotaEntity.USER)
+    )
+
+    // Match against all entities with the client ID type in a close-ended match.
+    testMatchEntities(
+      ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 1,
+      entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries.containsKey(ClientQuotaEntity.USER)
+    )
+
+    // Match against all entities with the client ID type in an open-ended match.
+    testMatchEntities(
+      ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava),  7,
+      entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+    )
+
+    // Match open-ended empty filter list. This should match all entities.
+    testMatchEntities(ClientQuotaFilter.contains(List.empty.asJava), 11, entity => true)
+
+    // Match close-ended empty filter list. This should match no entities.
+    testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, entity => false)
+  }
+
+  @Test
+  def testClientQuotasUnsupportedEntityTypes() {
+    val entity = new ClientQuotaEntity(Map(("other" -> "name")).asJava)
+    try {
+      verifyDescribeEntityQuotas(entity, Map())
+    } catch {
+      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException])
+    }
+  }
+
+  @Test
+  def testClientQuotasSanitized(): Unit = {
+    // An entity with name that must be sanitized when writing to Zookeeper.
+    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user with spaces")).asJava)
+
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(20000.0)),
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 20000.0),
+    ))
+  }
+
+  private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas: Map[String, Double]) = {
+    val components = entity.entries.asScala.map(e => ClientQuotaFilterComponent.ofEntity(e._1, e._2))
+    val describe = describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
+    if (quotas.isEmpty) {
+      assertEquals(0, describe.size)
+    } else {
+      assertEquals(1, describe.size)
+      val configs = describe.get(entity)
+      assertTrue(configs != null)
+      assertEquals(quotas.size, configs.size)
+      quotas.foreach { case (k, v) =>
+        val value = configs.get(k)
+        assertTrue(value != null)
+        assertEquals(v, value, 1e-6)
+      }
+    }
+  }
+
+  private def toEntity(user: Option[String], clientId: Option[String]) =
+    new ClientQuotaEntity((user.map((ClientQuotaEntity.USER -> _)) ++ clientId.map((ClientQuotaEntity.CLIENT_ID -> _))).toMap.asJava)
+
+  private def describeClientQuotas(filter: ClientQuotaFilter) = {
+    val result = new KafkaFutureImpl[java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]]]
+    sendDescribeClientQuotasRequest(filter).complete(result)
+    result.get
+  }
+
+  private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
+    val request = new DescribeClientQuotasRequest.Builder(filter).build()
+    connectAndReceive[DescribeClientQuotasResponse](request, destination = controllerSocketServer)
+  }
+
+  private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
+    try alterClientQuotas(Map(entity -> alter), validateOnly).get(entity).get.get(10, TimeUnit.SECONDS) catch {
+      case e: ExecutionException => throw e.getCause
+    }
+
+  private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, Option[Double]]], validateOnly: Boolean) = {
+    val entries = request.map { case (entity, alter) =>
+      val ops = alter.map { case (key, value) =>
+        new ClientQuotaAlteration.Op(key, value.map(Double.box).getOrElse(null))
+      }.asJavaCollection
+      new ClientQuotaAlteration(entity, ops)
+    }
+
+    val response = request.map(e => (e._1 -> new KafkaFutureImpl[Void])).asJava
+    sendAlterClientQuotasRequest(entries, validateOnly).complete(response)
+    val result = response.asScala
+    assertEquals(request.size, result.size)
+    request.foreach(e => assertTrue(result.get(e._1).isDefined))
+    result
+  }
+
+  private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
+    val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
+    connectAndReceive[AlterClientQuotasResponse](request, destination = controllerSocketServer)
+  }
+
+}


Mime
View raw message