kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6576: Configurable Quota Management (KIP-257) (#4699)
Date Fri, 06 Apr 2018 21:49:40 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77ebd32  KAFKA-6576: Configurable Quota Management (KIP-257) (#4699)
77ebd32 is described below

commit 77ebd32016d13c64ee5a3c7db63ca71bf4b79aad
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Apr 6 22:49:34 2018 +0100

    KAFKA-6576: Configurable Quota Management (KIP-257) (#4699)
    
    Enable quota calculation to be customized using a configurable callback. See KIP-257 for details.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../kafka/server/quota/ClientQuotaCallback.java    | 106 ++++
 .../kafka/server/quota/ClientQuotaEntity.java      |  62 +++
 .../apache/kafka/server/quota/ClientQuotaType.java |  26 +
 .../scala/kafka/server/ClientQuotaManager.scala    | 571 +++++++++++++--------
 .../kafka/server/ClientRequestQuotaManager.scala   |  21 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  53 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  11 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   8 +
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +
 .../main/scala/kafka/server/MetadataCache.scala    |  32 +-
 .../src/main/scala/kafka/server/QuotaFactory.scala |   6 +-
 .../integration/kafka/api/BaseQuotaTest.scala      | 236 +++++----
 .../integration/kafka/api/ClientIdQuotaTest.scala  |  52 +-
 .../kafka/api/CustomQuotaCallbackTest.scala        | 453 ++++++++++++++++
 .../kafka/api/UserClientIdQuotaTest.scala          |  54 +-
 .../integration/kafka/api/UserQuotaTest.scala      |  44 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  20 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala |  79 +--
 .../kafka/server/DynamicBrokerConfigTest.scala     |  32 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   9 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  17 +
 21 files changed, 1429 insertions(+), 465 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
new file mode 100644
index 0000000..210e9f4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.util.Map;
+
+/**
+ * Quota callback interface for brokers that enables customization of client quota computation.
+ */
+public interface ClientQuotaCallback extends Configurable {
+
+    /**
+     * Quota callback invoked to determine the quota metric tags to be applied for a request.
+     * Quota limits are associated with quota metrics and all clients which use the same
+     * metric tags share the quota limit.
+     *
+     * @param quotaType Type of quota requested
+     * @param principal The user principal of the connection for which quota is requested
+     * @param clientId  The client id associated with the request
+     * @return quota metric tags that indicate which other clients share this quota
+     */
+    Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId);
+
+    /**
+     * Returns the quota limit associated with the provided metric tags. These tags were returned from
+     * a previous call to {@link #quotaMetricTags(ClientQuotaType, KafkaPrincipal, String)}. This method is
+     * invoked by quota managers to obtain the current quota limit applied to a metric when the first request
+     * using these tags is processed. It is also invoked after a quota update or cluster metadata change.
+     * If the tags are no longer in use after the update, (e.g. this is a {user, client-id} quota metric
+     * and the quota now in use is a {user} quota), null is returned.
+     *
+     * @param quotaType  Type of quota requested
+     * @param metricTags Metric tags for a quota metric of type `quotaType`
+     * @return the quota limit for the provided metric tags or null if the metric tags are no longer in use
+     */
+    Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags);
+
+    /**
+     * Quota configuration update callback that is invoked when quota configuration for an entity is
+     * updated in ZooKeeper. This is useful to track configured quotas if built-in quota configuration
+     * tools are used for quota management.
+     *
+     * @param quotaType   Type of quota being updated
+     * @param quotaEntity The quota entity for which quota is being updated
+     * @param newValue    The new quota value
+     */
+    void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue);
+
+    /**
+     * Quota configuration removal callback that is invoked when quota configuration for an entity is
+     * removed in ZooKeeper. This is useful to track configured quotas if built-in quota configuration
+     * tools are used for quota management.
+     *
+     * @param quotaType   Type of quota being updated
+     * @param quotaEntity The quota entity for which quota is being updated
+     */
+    void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity);
+
+    /**
+     * Returns true if any of the existing quota configs may have been updated since the last call
+     * to this method for the provided quota type. Quota updates as a result of calls to
+     * {@link #updateClusterMetadata(Cluster)}, {@link #updateQuota(ClientQuotaType, ClientQuotaEntity, double)}
+     * and {@link #removeQuota(ClientQuotaType, ClientQuotaEntity)} are automatically processed.
+     * So callbacks that rely only on built-in quota configuration tools always return false. Quota callbacks
+     * with external quota configuration or custom reconfigurable quota configs that affect quota limits must
+     * return true if existing metric configs may need to be updated. This method is invoked on every request
+     * and hence is expected to be handled by callbacks as a simple flag that is updated when quotas change.
+     *
+     * @param quotaType Type of quota
+     */
+    boolean quotaResetRequired(ClientQuotaType quotaType);
+
+    /**
+     * Metadata update callback that is invoked whenever UpdateMetadata request is received from
+     * the controller. This is useful if quota computation takes partitions into account.
+     * Topics that are being deleted will not be included in `cluster`.
+     *
+     * @param cluster Cluster metadata including partitions and their leaders if known
+     * @return true if quotas have changed and metric configs may need to be updated
+     */
+    boolean updateClusterMetadata(Cluster cluster);
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}
+
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
new file mode 100644
index 0000000..a5ff082
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import java.util.List;
+
+/**
+ * The metadata for an entity for which quota is configured. Quotas may be defined at
+ * different levels and `configEntities` gives the list of config entities that define
+ * the level of this quota entity.
+ */
+public interface ClientQuotaEntity {
+
+    /**
+     * Entity type of a {@link ConfigEntity}
+     */
+    public enum ConfigEntityType {
+        USER,
+        CLIENT_ID,
+        DEFAULT_USER,
+        DEFAULT_CLIENT_ID
+    }
+
+    /**
+     * Interface representing a quota configuration entity. Quota may be
+     * configured at levels that include one or more configuration entities.
+     * For example, {user, client-id} quota is represented using two
+     * instances of ConfigEntity with entity types USER and CLIENT_ID.
+     */
+    public interface ConfigEntity {
+        /**
+         * Returns the name of this entity. For default quotas, an empty string is returned.
+         */
+        String name();
+
+        /**
+         * Returns the type of this entity.
+         */
+        ConfigEntityType entityType();
+    }
+
+    /**
+     * Returns the list of configuration entities that this quota entity is comprised of.
+     * For {user} or {clientId} quota, this is a single entity and for {user, clientId}
+     * quota, this is a list of two entities.
+     */
+    List<ConfigEntity> configEntities();
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java
new file mode 100644
index 0000000..4dd67d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+/**
+ * Types of quotas that may be configured on brokers for client requests.
+ */
+public enum ClientQuotaType {
+    PRODUCE,
+    FETCH,
+    REQUEST
+}
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 8ec27a3..0f8690f 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,24 +16,29 @@
  */
 package kafka.server
 
+import java.{lang, util}
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import kafka.network.RequestChannel.Session
+import kafka.server.ClientQuotaManager._
 import kafka.utils.{Logging, ShutdownableThread}
-import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
 
 import scala.collection.JavaConverters._
 
 /**
  * Represents the sensors aggregated per client
- * @param quotaEntity Quota entity representing <client-id>, <user> or <user, client-id>
+ * @param metricTags Quota metric tags for the client
  * @param quotaSensor @Sensor that tracks the quota
  * @param throttleTimeSensor @Sensor that tracks the throttle time
  */
-case class ClientSensors(quotaEntity: QuotaEntity, quotaSensor: Sensor, throttleTimeSensor: Sensor)
+case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor)
 
 /**
  * Configuration settings for quota management
@@ -61,9 +66,6 @@ object ClientQuotaManagerConfig {
   val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
 
   val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
-  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None, None)
-  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
 }
 
 object QuotaTypes {
@@ -71,11 +73,60 @@ object QuotaTypes {
   val ClientIdQuotaEnabled = 1
   val UserQuotaEnabled = 2
   val UserClientIdQuotaEnabled = 4
+  val CustomQuotas = 8 // No metric update optimizations are used with custom quotas
 }
 
-case class QuotaId(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String])
+object ClientQuotaManager {
+  val DefaultClientIdQuotaEntity = KafkaQuotaEntity(None, Some(DefaultClientIdEntity))
+  val DefaultUserQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), None)
+  val DefaultUserClientIdQuotaEntity = KafkaQuotaEntity(Some(DefaultUserEntity), Some(DefaultClientIdEntity))
 
-case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, sanitizedClientId: String, quota: Quota)
+  case class UserEntity(sanitizedUser: String) extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.USER
+    override def name: String = Sanitizer.desanitize(sanitizedUser)
+    override def toString: String = s"user $sanitizedUser"
+  }
+
+  case class ClientIdEntity(clientId: String) extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.CLIENT_ID
+    override def name: String = clientId
+    override def toString: String = s"client-id $clientId"
+  }
+
+  case object DefaultUserEntity extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
+    override def name: String = ConfigEntityName.Default
+    override def toString: String = "default user"
+  }
+
+  case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
+    override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
+    override def name: String = ConfigEntityName.Default
+    override def toString: String = "default client-id"
+  }
+
+  case class KafkaQuotaEntity(userEntity: Option[ClientQuotaEntity.ConfigEntity],
+                              clientIdEntity: Option[ClientQuotaEntity.ConfigEntity]) extends ClientQuotaEntity {
+    override def configEntities: util.List[ClientQuotaEntity.ConfigEntity] =
+      (userEntity.toList ++ clientIdEntity.toList).asJava
+    def sanitizedUser: String = userEntity.map {
+      case entity: UserEntity => entity.sanitizedUser
+      case DefaultUserEntity => ConfigEntityName.Default
+    }.getOrElse("")
+    def clientId: String = clientIdEntity.map(_.name).getOrElse("")
+
+    override def toString: String = {
+      val user = userEntity.map(_.toString).getOrElse("")
+      val clientId = clientIdEntity.map(_.toString).getOrElse("")
+      s"$user $clientId".trim
+    }
+  }
+
+  object DefaultTags {
+    val User = "user"
+    val ClientId = "client-id"
+  }
+}
 
 /**
  * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
@@ -107,21 +158,26 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val quotaType: QuotaType,
                          private val time: Time,
-                         threadNamePrefix: String) extends Logging {
-  private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
+                         threadNamePrefix: String,
+                         clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging {
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
-  @volatile private var quotaTypesEnabled =
-    if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
-    else QuotaTypes.ClientIdQuotaEnabled
+  private val clientQuotaType = quotaTypeToClientQuotaType(quotaType)
+  @volatile private var quotaTypesEnabled = clientQuotaCallback match {
+    case Some(_) => QuotaTypes.CustomQuotas
+    case None =>
+      if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas
+      else QuotaTypes.ClientIdQuotaEnabled
+  }
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess(lock, metrics)
   private[server] val throttledRequestReaper = new ThrottledRequestReaper(delayQueue, threadNamePrefix)
+  private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
 
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
-                                      quotaType.toString,
-                                      "Tracks the size of the delay queue"), new Total())
+    quotaType.toString,
+    "Tracks the size of the delay queue"), new Total())
   start() // Use start method to keep findbugs happy
   private def start() {
     throttledRequestReaper.start()
@@ -132,7 +188,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * @param delayQueue DelayQueue to dequeue from
    */
   class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse], prefix: String) extends ShutdownableThread(
-    s"${prefix}ThrottledRequestReaper-${quotaType}", false) {
+    s"${prefix}ThrottledRequestReaper-$quotaType", false) {
 
     override def doWork(): Unit = {
       val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
@@ -158,17 +214,18 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * Records that a user/clientId changed some metric being throttled (produced/consumed bytes, request processing time etc.)
    * If quota has been violated, callback is invoked after a delay, otherwise the callback is invoked immediately.
    * Throttle time calculation may be overridden by sub-classes.
-   * @param sanitizedUser user principal of client
+   *
+   * @param session  the session associated with this request
    * @param clientId clientId that produced/fetched the data
-   * @param value amount of data in bytes or request processing time as a percentage
+   * @param value    amount of data in bytes or request processing time as a percentage
    * @param callback Callback function. This will be triggered immediately if quota is not violated.
    *                 If there is a quota violation, this callback will be triggered after a delay
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
+  def maybeRecordAndThrottle(session: Session, clientId: String, value: Double, callback: Int => Unit): Int = {
     if (quotasEnabled) {
-      val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
       recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
     } else {
       // Don't record any metrics if quotas are not enabled at any level
@@ -187,9 +244,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     } catch {
       case _: QuotaViolationException =>
         // Compute the delay
-        val clientQuotaEntity = clientSensors.quotaEntity
-        val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
+        val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
+        throttleTimeMs = throttleTime(clientMetric).toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -209,126 +265,27 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Determines the quota-id for the client with the specified user principal
-   * and client-id and returns the quota entity that encapsulates the quota-id
-   * and the associated quota override or default quota.
+   * Returns the quota for the client with the specified (non-encoded) user principal and client-id.
    *
+   * Note: this method is expensive, it is meant to be used by tests only
    */
-  private def quotaEntity(sanitizedUser: String, clientId: String, sanitizedClientId: String) : QuotaEntity = {
-    quotaTypesEnabled match {
-      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
-        val quotaId = QuotaId(None, Some(clientId), Some(sanitizedClientId))
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
-          if (quota == null)
-            quota = staticConfigClientIdQuota
-        }
-        QuotaEntity(quotaId, "", clientId, sanitizedClientId, quota)
-      case QuotaTypes.UserQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), None, None)
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
-          if (quota == null)
-            quota = ClientQuotaManagerConfig.UnlimitedQuota
-        }
-        QuotaEntity(quotaId, sanitizedUser, "", "", quota)
-      case QuotaTypes.UserClientIdQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizedClientId))
-        var quota = overriddenQuota.get(quotaId)
-        if (quota == null) {
-          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)))
-          if (quota == null) {
-            quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizedClientId)))
-            if (quota == null) {
-              quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
-              if (quota == null)
-                quota = ClientQuotaManagerConfig.UnlimitedQuota
-            }
-          }
-        }
-        QuotaEntity(quotaId, sanitizedUser, clientId, sanitizedClientId, quota)
-      case _ =>
-        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId, sanitizedClientId)
-    }
-  }
-
-  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String, sanitizerClientId: String) : QuotaEntity = {
-    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizerClientId))
-
-    val userQuotaId = QuotaId(Some(sanitizedUser), None, None)
-    val clientQuotaId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
-    var quotaId = userClientQuotaId
-    var quotaConfigId = userClientQuotaId
-    // 1) /config/users/<user>/clients/<client-id>
-    var quota = overriddenQuota.get(quotaConfigId)
-    if (quota == null) {
-      // 2) /config/users/<user>/clients/<default>
-      quotaId = userClientQuotaId
-      quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-      quota = overriddenQuota.get(quotaConfigId)
-
-      if (quota == null) {
-        // 3) /config/users/<user>
-        quotaId = userQuotaId
-        quotaConfigId = quotaId
-        quota = overriddenQuota.get(quotaConfigId)
-
-        if (quota == null) {
-          // 4) /config/users/<default>/clients/<client-id>
-          quotaId = userClientQuotaId
-          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizerClientId))
-          quota = overriddenQuota.get(quotaConfigId)
-
-          if (quota == null) {
-            // 5) /config/users/<default>/clients/<default>
-            quotaId = userClientQuotaId
-            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-            quota = overriddenQuota.get(quotaConfigId)
-
-            if (quota == null) {
-              // 6) /config/users/<default>
-              quotaId = userQuotaId
-              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None, None)
-              quota = overriddenQuota.get(quotaConfigId)
-
-              if (quota == null) {
-                // 7) /config/clients/<client-id>
-                quotaId = clientQuotaId
-                quotaConfigId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
-                quota = overriddenQuota.get(quotaConfigId)
-
-                if (quota == null) {
-                  // 8) /config/clients/<default>
-                  quotaId = clientQuotaId
-                  quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
-                  quota = overriddenQuota.get(quotaConfigId)
-
-                  if (quota == null) {
-                    quotaId = clientQuotaId
-                    quotaConfigId = null
-                    quota = staticConfigClientIdQuota
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-    val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
-    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
-    QuotaEntity(quotaId, quotaUser, quotaClientId, sanitizerClientId, quota)
+  def quota(user: String, clientId: String): Quota = {
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+    quota(userPrincipal, clientId)
   }
 
   /**
-   * Returns the quota for the client with the specified (non-encoded) user principal and client-id.
-   * 
+   * Returns the quota for the client with the specified user principal and client-id.
+   *
    * Note: this method is expensive, it is meant to be used by tests only
    */
-  def quota(user: String, clientId: String) = {
-    quotaEntity(Sanitizer.sanitize(user), clientId, Sanitizer.sanitize(clientId)).quota
+  def quota(userPrincipal: KafkaPrincipal, clientId: String): Quota = {
+    val metricTags = quotaCallback.quotaMetricTags(clientQuotaType, userPrincipal, clientId)
+    Quota.upperBound(quotaLimit(metricTags))
+  }
+
+  private def quotaLimit(metricTags: util.Map[String, String]): Double = {
+    Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble)getOrElse(Long.MaxValue)
   }
 
   /*
@@ -339,10 +296,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
+  protected def throttleTime(clientMetric: KafkaMetric): Long = {
+    val config = clientMetric.config
     val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
     val quota = config.quota()
-    val difference = clientMetric.value() - quota.bound
+    val difference = clientMetric.metricValue.asInstanceOf[Double] - quota.bound
     // Use the precise window used by the rate calculation
     val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
     throttleTimeMs.round
@@ -360,56 +318,72 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * This function either returns the sensors for a given client id or creates them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
-  def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
-    val sanitizedClientId = Sanitizer.sanitize(clientId)
-    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId, sanitizedClientId)
+  def getOrCreateQuotaSensors(session: Session, clientId: String): ClientSensors = {
+    // Use cached sanitized principal if using default callback
+    val metricTags = quotaCallback match {
+      case callback: DefaultQuotaCallback => callback.quotaMetricTags(session.sanitizedUser, clientId)
+      case _ => quotaCallback.quotaMetricTags(clientQuotaType, session.principal, clientId).asScala.toMap
+    }
     // Names of the sensors to access
-    ClientSensors(
-      clientQuotaEntity,
+    val sensors = ClientSensors(
+      metricTags,
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(clientQuotaEntity.quotaId),
+        getQuotaSensorName(metricTags),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
-        Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
+        clientRateMetricName(metricTags),
+        Some(getQuotaMetricConfig(metricTags)),
         new Rate
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientQuotaEntity.quotaId),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(metricTags),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        throttleMetricName(clientQuotaEntity),
+        throttleMetricName(metricTags),
         None,
         new Avg
       )
     )
+    if (quotaCallback.quotaResetRequired(clientQuotaType))
+      updateQuotaMetricConfigs()
+    sensors
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def metricTagsToSensorSuffix(metricTags: Map[String, String]): String =
+    metricTags.values.mkString(":")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def getThrottleTimeSensorName(metricTags: Map[String, String]): String =
+    s"${quotaType}ThrottleTime-${metricTagsToSensorSuffix(metricTags)}"
 
-  protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+  private def getQuotaSensorName(metricTags: Map[String, String]): String =
+    s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"
+
+  private def getQuotaMetricConfig(metricTags: Map[String, String]): MetricConfig = {
+    getQuotaMetricConfig(quotaLimit(metricTags.asJava))
+  }
+
+  private def getQuotaMetricConfig(quotaLimit: Double): MetricConfig = {
     new MetricConfig()
-            .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-            .samples(config.numQuotaSamples)
-            .quota(quota)
+      .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
   }
 
   protected def getOrCreateSensor(sensorName: String, metricName: MetricName): Sensor = {
     sensorAccessor.getOrCreate(
-        sensorName,
-        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        metricName,
-        None,
-        new Rate
-      )
+      sensorName,
+      ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+      metricName,
+      None,
+      new Rate
+    )
   }
 
   /**
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
    * for any of these levels.
-   * @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
-   * @param clientId client to override if quota applies to <client-id> or <user, client-id>
+   *
+   * @param sanitizedUser     user to override if quota applies to <user> or <user, client-id>
+   * @param clientId          client to override if quota applies to <client-id> or <user, client-id>
    * @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
-   * @param quota custom quota to apply or None if quota override is being removed
+   * @param quota             custom quota to apply or None if quota override is being removed
    */
   def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) {
     /*
@@ -421,86 +395,233 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      val quotaId = QuotaId(sanitizedUser, clientId, sanitizedClientId)
-      val userInfo = sanitizedUser match {
-        case Some(ConfigEntityName.Default) => "default user "
-        case Some(user) => "user " + user + " "
-        case None => ""
+      val userEntity = sanitizedUser.map {
+        case ConfigEntityName.Default => DefaultUserEntity
+        case user => UserEntity(user)
       }
-      val clientIdInfo = clientId match {
-        case Some(ConfigEntityName.Default) => "default client-id"
-        case Some(id) => "client-id " + id
-        case None => ""
+      val clientIdEntity = sanitizedClientId.map {
+        case ConfigEntityName.Default => DefaultClientIdEntity
+        case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
       }
+      val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)
+
+      if (userEntity.nonEmpty) {
+        if (quotaEntity.clientIdEntity.nonEmpty)
+          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+        else
+          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+      } else if (clientIdEntity.nonEmpty)
+        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+
       quota match {
-        case Some(newQuota) =>
-          info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
-          overriddenQuota.put(quotaId, newQuota)
-          (sanitizedUser, clientId) match {
-            case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-            case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-            case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-            case (None, None) =>
-          }
-        case None =>
-          info(s"Removing ${quotaType} quota for ${userInfo}${clientIdInfo}")
-          overriddenQuota.remove(quotaId)
+        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
+        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
       }
+      val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientIdEntity.contains(DefaultClientIdEntity))
+        None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
+      else
+        Some(quotaEntity)
+      updateQuotaMetricConfigs(updatedEntity)
 
-      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse(""))
-      val allMetrics = metrics.metrics()
+    } finally {
+      lock.writeLock().unlock()
+    }
+  }
 
-      // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics
-      // to find all affected values. Otherwise, update just the single matching one.
-      val singleUpdate = quotaTypesEnabled match {
-        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
-          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined
-        case _ => false
+  /**
+   * Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
+   * or when partitions leaders change and custom callbacks that implement partition-based quotas
+   * have updated quotas.
+   * @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
+   *    level, then an optimized update is performed with a single metric update. If None is provided,
+   *    or if custom callbacks are used or if multi-level quotas have been enabled, all metric configs
+   *    are checked and updated if required.
+   */
+  def updateQuotaMetricConfigs(updatedQuotaEntity: Option[KafkaQuotaEntity] = None): Unit = {
+    val allMetrics = metrics.metrics()
+
+    // If using custom quota callbacks or if multiple-levels of quotas are defined or
+    // if this is a default quota update, traverse metrics to find all affected values.
+    // Otherwise, update just the single matching one.
+    val singleUpdate = quotaTypesEnabled match {
+      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
+        updatedQuotaEntity.nonEmpty
+      case _ => false
+    }
+    if (singleUpdate) {
+      val quotaEntity = updatedQuotaEntity.getOrElse(throw new IllegalStateException("Quota entity not specified"))
+      val user = quotaEntity.sanitizedUser
+      val clientId = quotaEntity.clientId
+      val metricTags = Map(DefaultTags.User -> user, DefaultTags.ClientId -> clientId)
+
+      val quotaMetricName = clientRateMetricName(metricTags)
+      // Change the underlying metric config if the sensor has been created
+      val metric = allMetrics.get(quotaMetricName)
+      if (metric != null) {
+        Option(quotaCallback.quotaLimit(clientQuotaType, metricTags.asJava)).foreach { newQuota =>
+          info(s"Sensor for $quotaEntity already exists. Changing quota to $newQuota in MetricConfig")
+          metric.config(getQuotaMetricConfig(newQuota))
+        }
       }
-      if (singleUpdate) {
-          // Change the underlying metric config if the sensor has been created
-          val metric = allMetrics.get(quotaMetricName)
-          if (metric != null) {
-            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
-            val newQuota = metricConfigEntity.quota
-            info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
-            metric.config(getQuotaMetricConfig(newQuota))
-          }
-      } else {
-          allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
-            case (metricName, metric) =>
-              val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
-              val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""
-              val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag))
-              if (metricConfigEntity.quota != metric.config.quota) {
-                val newQuota = metricConfigEntity.quota
-                info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
-                metric.config(getQuotaMetricConfig(newQuota))
-              }
+    } else {
+      val quotaMetricName = clientRateMetricName(Map.empty)
+      allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
+        case (metricName, metric) =>
+          val metricTags = metricName.tags
+          Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).foreach { quota =>
+            val newQuota = quota.asInstanceOf[Double]
+            if (newQuota != metric.config.quota.bound) {
+              info(s"Sensor for quota-id $metricTags already exists. Setting quota to $newQuota in MetricConfig")
+              metric.config(getQuotaMetricConfig(newQuota))
+            }
           }
       }
-
-    } finally {
-      lock.writeLock().unlock()
     }
   }
 
-  protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+  protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("byte-rate", quotaType.toString,
-                   "Tracking byte-rate per user/client-id",
-                   "user", sanitizedUser,
-                   "client-id", clientId)
+      "Tracking byte-rate per user/client-id",
+      quotaMetricTags.asJava)
   }
 
-  private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
+  private def throttleMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("throttle-time",
-                       quotaType.toString,
-                       "Tracking average throttle-time per user/client-id",
-                       "user", quotaEntity.sanitizedUser,
-                       "client-id", quotaEntity.clientId)
+      quotaType.toString,
+      "Tracking average throttle-time per user/client-id",
+      quotaMetricTags.asJava)
   }
 
-  def shutdown() = {
+  private def quotaTypeToClientQuotaType(quotaType: QuotaType): ClientQuotaType = {
+    quotaType match {
+      case QuotaType.Fetch => ClientQuotaType.FETCH
+      case QuotaType.Produce => ClientQuotaType.PRODUCE
+      case QuotaType.Request => ClientQuotaType.REQUEST
+      case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType")
+    }
+  }
+
+  def shutdown(): Unit = {
     throttledRequestReaper.shutdown()
   }
+
+  class DefaultQuotaCallback extends ClientQuotaCallback {
+    private val overriddenQuotas = new ConcurrentHashMap[ClientQuotaEntity, Quota]()
+
+    override def configure(configs: util.Map[String, _]): Unit = {}
+
+    override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
+      quotaMetricTags(Sanitizer.sanitize(principal.getName), clientId).asJava
+    }
+
+    override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = {
+      val sanitizedUser = metricTags.get(DefaultTags.User)
+      val clientId = metricTags.get(DefaultTags.ClientId)
+      var quota: Quota = null
+
+      if (sanitizedUser != null && clientId != null) {
+        val userEntity = Some(UserEntity(sanitizedUser))
+        val clientIdEntity = Some(ClientIdEntity(clientId))
+        if (!sanitizedUser.isEmpty && !clientId.isEmpty) {
+          // /config/users/<user>/clients/<client-id>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, clientIdEntity))
+          if (quota == null) {
+            // /config/users/<user>/clients/<default>
+            quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, Some(DefaultClientIdEntity)))
+          }
+          if (quota == null) {
+            // /config/users/<default>/clients/<client-id>
+            quota = overriddenQuotas.get(KafkaQuotaEntity(Some(DefaultUserEntity), clientIdEntity))
+          }
+          if (quota == null) {
+            // /config/users/<default>/clients/<default>
+            quota = overriddenQuotas.get(DefaultUserClientIdQuotaEntity)
+          }
+        } else if (!sanitizedUser.isEmpty) {
+          // /config/users/<user>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(userEntity, None))
+          if (quota == null) {
+            // /config/users/<default>
+            quota = overriddenQuotas.get(DefaultUserQuotaEntity)
+          }
+        } else if (!clientId.isEmpty) {
+          // /config/clients/<client-id>
+          quota = overriddenQuotas.get(KafkaQuotaEntity(None, clientIdEntity))
+          if (quota == null) {
+            // /config/clients/<default>
+            quota = overriddenQuotas.get(DefaultClientIdQuotaEntity)
+          }
+          if (quota == null)
+            quota = staticConfigClientIdQuota
+        }
+      }
+      if (quota == null) null else quota.bound
+    }
+
+    override def updateClusterMetadata(cluster: Cluster): Boolean = {
+      // Default quota callback does not use any cluster metadata
+      false
+    }
+
+    override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = {
+      val quotaEntity = entity.asInstanceOf[KafkaQuotaEntity]
+      info(s"Changing $quotaType quota for $quotaEntity to $newValue")
+      overriddenQuotas.put(quotaEntity, new Quota(newValue, true))
+    }
+
+    override def removeQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity): Unit = {
+      val quotaEntity = entity.asInstanceOf[KafkaQuotaEntity]
+      info(s"Removing $quotaType quota for $quotaEntity")
+      overriddenQuotas.remove(quotaEntity)
+    }
+
+    override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = false
+
+    def quotaMetricTags(sanitizedUser: String, clientId: String) : Map[String, String] = {
+      val (userTag, clientIdTag) = quotaTypesEnabled match {
+        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
+          ("", clientId)
+        case QuotaTypes.UserQuotaEnabled =>
+          (sanitizedUser, "")
+        case QuotaTypes.UserClientIdQuotaEnabled =>
+          (sanitizedUser, clientId)
+        case _ =>
+          val userEntity = Some(UserEntity(sanitizedUser))
+          val clientIdEntity = Some(ClientIdEntity(clientId))
+
+          var metricTags = (sanitizedUser, clientId)
+          // 1) /config/users/<user>/clients/<client-id>
+          if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, clientIdEntity))) {
+            // 2) /config/users/<user>/clients/<default>
+            metricTags = (sanitizedUser, clientId)
+            if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, Some(DefaultClientIdEntity)))) {
+              // 3) /config/users/<user>
+              metricTags = (sanitizedUser, "")
+              if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, None))) {
+                // 4) /config/users/<default>/clients/<client-id>
+                metricTags = (sanitizedUser, clientId)
+                if (!overriddenQuotas.containsKey(KafkaQuotaEntity(Some(DefaultUserEntity), clientIdEntity))) {
+                  // 5) /config/users/<default>/clients/<default>
+                  metricTags = (sanitizedUser, clientId)
+                  if (!overriddenQuotas.containsKey(DefaultUserClientIdQuotaEntity)) {
+                    // 6) /config/users/<default>
+                    metricTags = (sanitizedUser, "")
+                    if (!overriddenQuotas.containsKey(DefaultUserQuotaEntity)) {
+                      // 7) /config/clients/<client-id>
+                      // 8) /config/clients/<default>
+                      // 9) static client-id quota
+                      metricTags = ("", clientId)
+                    }
+                  }
+                }
+              }
+            }
+          }
+          metricTags
+      }
+      Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag)
+    }
+
+    override def close(): Unit = {}
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 59fa421..3078a62 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -22,13 +22,17 @@ import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.quota.ClientQuotaCallback
+
+import scala.collection.JavaConverters._
 
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
                                 private val metrics: Metrics,
                                 private val time: Time,
-                                threadNamePrefix: String)
-                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix) {
+                                threadNamePrefix: String,
+                                quotaCallback: Option[ClientQuotaCallback])
+                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) {
   val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
   def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
 
@@ -43,7 +47,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
     }
 
     if (quotasEnabled) {
-      val quotaSensors = getOrCreateQuotaSensors(request.session.sanitizedUser, request.header.clientId)
+      val quotaSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
       request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
 
       recordAndThrottleOnQuotaViolation(
@@ -62,15 +66,14 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
-  override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long = {
-    math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
+  override protected def throttleTime(clientMetric: KafkaMetric): Long = {
+    math.min(super.throttleTime(clientMetric), maxThrottleTimeMs)
   }
 
-  override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
+  override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
     metrics.metricName("request-time", QuotaType.Request.toString,
-                   "Tracking request-time per user/client-id",
-                   "user", sanitizedUser,
-                   "client-id", clientId)
+      "Tracking request-time per user/client-id",
+      quotaMetricTags.asJava)
   }
 
   private def exemptMetricName: MetricName = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 766907a..1839768 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -75,13 +75,12 @@ object DynamicBrokerConfig {
 
   private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
 
-  val AllDynamicConfigs = mutable.Set[String]()
-  AllDynamicConfigs ++= DynamicSecurityConfigs
-  AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
-  AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
-  AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
-  AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
-  AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs
+  val AllDynamicConfigs = DynamicSecurityConfigs ++
+    LogCleaner.ReconfigurableConfigs ++
+    DynamicLogConfig.ReconfigurableConfigs ++
+    DynamicThreadPool.ReconfigurableConfigs ++
+    Set(KafkaConfig.MetricReporterClassesProp) ++
+    DynamicListenerConfig.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs
@@ -159,16 +158,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaConfig))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
-    require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
     reconfigurables += reconfigurable
   }
 
   def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
-    require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     brokerReconfigurables += reconfigurable
   }
 
@@ -176,6 +176,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     reconfigurables -= reconfigurable
   }
 
+  private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
+    val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
+    require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
+  }
+
   // Visibility for testing
   private[server] def currentKafkaConfig: KafkaConfig = CoreUtils.inReadLock(lock) {
     currentConfig
@@ -705,6 +710,36 @@ object DynamicListenerConfig {
   )
 }
 
+class DynamicClientQuotaCallback(brokerId: Int, config: KafkaConfig) extends Reconfigurable {
+
+  override def configure(configs: util.Map[String, _]): Unit = {}
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    val configs = new util.HashSet[String]()
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
+      case _ =>
+    }
+    configs
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable => callback.validateReconfiguration(configs)
+      case _ =>
+    }
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    config.quotaCallback.foreach {
+      case callback: Reconfigurable =>
+        config.dynamicConfig.maybeReconfigure(callback, config.dynamicConfig.currentKafkaConfig, configs)
+        true
+      case _ => false
+    }
+  }
+}
+
 class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging {
 
   override def reconfigurableConfigs: Set[String] = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e79afa..f43f8a5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -231,6 +231,13 @@ class KafkaApis(val requestChannel: RequestChannel,
           adminManager.tryCompleteDelayedTopicOperations(topic)
         }
       }
+      config.quotaCallback.foreach { callback =>
+        if (callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, request.context.listenerName))) {
+          quotas.fetch.updateQuotaMetricConfigs()
+          quotas.produce.updateQuotaMetricConfigs()
+          quotas.request.updateQuotaMetricConfigs()
+        }
+      }
       sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
     } else {
       sendResponseMaybeThrottle(request, _ => new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
@@ -445,7 +452,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
       quotas.produce.maybeRecordAndThrottle(
-        request.session.sanitizedUser,
+        request.session,
         request.header.clientId,
         numBytesAppended,
         produceResponseCallback)
@@ -610,7 +617,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // This may be slightly different from the actual response size. But since down conversions
         // result in data being loaded into memory, it is better to do this after throttling to avoid OOM.
         val responseStruct = unconvertedFetchResponse.toStruct(versionId)
-        quotas.fetch.maybeRecordAndThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
+        quotas.fetch.maybeRecordAndThrottle(request.session, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7eda083..7927f1b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -390,6 +391,7 @@ object KafkaConfig {
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
   val AlterLogDirsReplicationQuotaWindowSizeSecondsProp = "alter.log.dirs.replication.quota.window.size.seconds"
+  val ClientQuotaCallbackClassProp = "client.quota.callback.class"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -677,6 +679,10 @@ object KafkaConfig {
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
   val AlterLogDirsReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for alter log dirs replication quotas"
+  val ClientQuotaCallbackClassDoc = "The fully qualified name of a class that implements the ClientQuotaCallback interface, " +
+    "which is used to determine quota limits applied to client requests. By default, <user, client-id>, <user> or <client-id> " +
+    "quotas stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " +
+    "of the session and the client-id of the request is applied."
   /** ********* Transaction Configuration ***********/
   val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
     "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
@@ -922,6 +928,7 @@ object KafkaConfig {
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
       .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc)
+      .define(ClientQuotaCallbackClassProp, CLASS, null, LOW, ClientQuotaCallbackClassDoc)
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
@@ -1217,6 +1224,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
   val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
   val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
+  val quotaCallback = Option(getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, classOf[ClientQuotaCallback]))
 
   /** ********* Transaction Configuration **************/
   val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 53632cd..f621051 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -595,6 +595,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         if (quotaManagers != null)
           CoreUtils.swallow(quotaManagers.shutdown(), this)
+        config.quotaCallback.foreach(_.close())
+
         // Even though socket server is stopped much earlier, controller can generate
         // response for controlled shutdown request. Shutdown server at the end to
         // avoid any failures (e.g. when metrics are recorded)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 7cdb8f1..43fe352 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util.Collections
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.{Seq, Set, mutable}
@@ -28,7 +29,7 @@ import kafka.controller.StateChangeLogger
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
@@ -127,6 +128,14 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def getAllPartitions(): Map[TopicPartition, UpdateMetadataRequest.PartitionState] = {
+    inReadLock(partitionMetadataLock) {
+      cache.flatMap { case (topic, partitionStates) =>
+        partitionStates.map { case (partition, state ) => (new TopicPartition(topic, partition), state) }
+      }.toMap
+    }
+  }
+
   def getNonExistingTopics(topics: Set[String]): Set[String] = {
     inReadLock(partitionMetadataLock) {
       topics -- cache.keySet
@@ -180,6 +189,27 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   def getControllerId: Option[Int] = controllerId
 
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    inReadLock(partitionMetadataLock) {
+      val nodes = aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) }
+      def node(id: Integer): Node = nodes.get(id).orNull
+      val partitions = getAllPartitions()
+        .filter { case (_, state) => state.basePartitionState.leader != LeaderAndIsr.LeaderDuringDelete }
+        .map { case (tp, state) =>
+          new PartitionInfo(tp.topic, tp.partition, node(state.basePartitionState.leader),
+            state.basePartitionState.replicas.asScala.map(node).toArray,
+            state.basePartitionState.isr.asScala.map(node).toArray,
+            state.offlineReplicas.asScala.map(node).toArray)
+        }
+      val unauthorizedTopics = Collections.emptySet[String]
+      val internalTopics = getAllTopics().filter(Topic.isInternal).asJava
+      new Cluster(clusterId, nodes.values.filter(_ != null).toList.asJava,
+        partitions.toList.asJava,
+        unauthorizedTopics, internalTopics,
+        getControllerId.map(id => node(id)).orNull)
+    }
+  }
+
   // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
   def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
     inWriteLock(partitionMetadataLock) {
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 01441b5..c758b5a 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -54,9 +54,9 @@ object QuotaFactory extends Logging {
 
   def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
     QuotaManagers(
-      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix),
-      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix),
-      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix),
+      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, cfg.quotaCallback),
+      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, cfg.quotaCallback),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, cfg.quotaCallback),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
       new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9b1c2aa..b265182 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -16,31 +16,29 @@ package kafka.api
 
 import java.util.{Collections, HashMap, Properties}
 
-import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId, QuotaType}
+import kafka.api.QuotaTestClients._
+import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
-abstract class BaseQuotaTest extends IntegrationTestHarness {
+import scala.collection.JavaConverters._
 
-  def userPrincipal : String
-  def producerQuotaId : QuotaId
-  def consumerQuotaId : QuotaId
-  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
-  def removeQuotaOverrides()
+abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   override val serverCount = 2
   val producerCount = 1
   val consumerCount = 1
 
-  private val producerBufferSize = 300000
   protected def producerClientId = "QuotasTestProducer-1"
   protected def consumerClientId = "QuotasTestConsumer-1"
+  protected def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients
 
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
@@ -49,7 +47,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
   this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
-  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "300000")
   this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -63,9 +61,10 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   val defaultConsumerQuota = 2500
   val defaultRequestQuota = Int.MaxValue
 
-  var leaderNode: KafkaServer = null
-  var followerNode: KafkaServer = null
-  private val topic1 = "topic-1"
+  val topic1 = "topic-1"
+  var leaderNode: KafkaServer = _
+  var followerNode: KafkaServer = _
+  var quotaTestClients: QuotaTestClients = _
 
   @Before
   override def setUp() {
@@ -75,22 +74,19 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     val leaders = createTopic(topic1, numPartitions, serverCount)
     leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
     followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
+    quotaTestClients = createQuotaTestClients(topic1, leaderNode)
   }
 
   @Test
   def testThrottledProducerConsumer() {
 
     val numRecords = 1000
-    val producer = producers.head
-    val produced = produceUntilThrottled(producer, numRecords)
-    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
-    verifyProducerThrottleTimeMetric(producer)
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    val consumer = consumers.head
-    consumeUntilThrottled(consumer, produced)
-    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
-    verifyConsumerThrottleTimeMetric(consumer)
+    quotaTestClients.consumeUntilThrottled(produced)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
   @Test
@@ -100,154 +96,187 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, Long.MaxValue.toString)
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, Long.MaxValue.toString)
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
-    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
+    quotaTestClients.verifyProduceThrottle(expectThrottle = false)
 
     // The "client" consumer does not get throttled.
-    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
   }
 
   @Test
   def testQuotaOverrideDelete() {
     // Override producer and consumer quotas to unlimited
-    overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Int.MaxValue)
 
     val numRecords = 1000
-    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
-    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
-    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
+    quotaTestClients.verifyProduceThrottle(expectThrottle = false)
+    assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
 
     // Delete producer and consumer quota overrides. Consumer and producer should now be
     // throttled since broker defaults are very small
-    removeQuotaOverrides()
-    val produced = produceUntilThrottled(producers.head, numRecords)
-    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+    quotaTestClients.removeQuotaOverrides()
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Since producer may have been throttled after producing a couple of records,
     // consume from beginning till throttled
     consumers.head.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
-    consumeUntilThrottled(consumers.head, numRecords + produced)
-    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
+    quotaTestClients.consumeUntilThrottled(numRecords + produced)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
   @Test
   def testThrottledRequest() {
 
-    overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
-    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
+    quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
+    quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
 
     val consumer = consumers.head
     consumer.subscribe(Collections.singleton(topic1))
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
-    while ((!throttled || exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
+    while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
       consumer.poll(100)
-      val throttleMetric = consumerRequestThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId)
+      throttled = throttleMetric != null && metricValue(throttleMetric) > 0
     }
 
     assertTrue("Should have been throttled", throttled)
-    verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
+    quotaTestClients.verifyConsumerClientThrottleTimeMetric(expectThrottle = true,
+      Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
+
+    val exemptMetric = quotaTestClients.exemptRequestMetric
+    assertNotNull("Exempt requests not recorded", exemptMetric)
+    assertTrue("Exempt requests not recorded", metricValue(exemptMetric) > 0)
+  }
+}
+
+object QuotaTestClients {
+  def metricValue(metric: Metric): Double = metric.metricValue().asInstanceOf[Double]
+}
+
+abstract class QuotaTestClients(topic: String,
+                                leaderNode: KafkaServer,
+                                producerClientId: String,
+                                consumerClientId: String,
+                                producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+
+  def userPrincipal : KafkaPrincipal
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
+  def removeQuotaOverrides()
+
+  def quotaMetricTags(clientId: String): Map[String, String]
 
-    assertNotNull("Exempt requests not recorded", exemptRequestMetric)
-    assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
+  def quota(quotaManager: ClientQuotaManager, userPrincipal: KafkaPrincipal, clientId: String): Quota = {
+    quotaManager.quota(userPrincipal, clientId)
   }
 
-  def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
+  def produceUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = {
     var numProduced = 0
     var throttled = false
     do {
       val payload = numProduced.toString.getBytes
-      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
-             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, null, null, payload),
+        new ErrorLoggingCallback(topic, null, null, true))
       numProduced += 1
-      val throttleMetric = producerThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      do {
+        val metric = throttleMetric(QuotaType.Produce, producerClientId)
+        throttled = metric != null && metricValue(metric) > 0
+      } while (!future.isDone && (!throttled || waitForRequestCompletion))
     } while (numProduced < maxRecords && !throttled)
     numProduced
   }
 
-  def consumeUntilThrottled(consumer: KafkaConsumer[Array[Byte], Array[Byte]], maxRecords: Int): Int = {
-    consumer.subscribe(Collections.singleton(topic1))
+  def consumeUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = {
+    consumer.subscribe(Collections.singleton(topic))
     var numConsumed = 0
     var throttled = false
     do {
       numConsumed += consumer.poll(100).count
-      val throttleMetric = consumerThrottleMetric
-      throttled = throttleMetric != null && throttleMetric.value > 0
+      val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
+      throttled = metric != null && metricValue(metric) > 0
     }  while (numConsumed < maxRecords && !throttled)
 
     // If throttled, wait for the records from the last fetch to be received
-    if (throttled && numConsumed < maxRecords) {
+    if (throttled && numConsumed < maxRecords && waitForRequestCompletion) {
       val minRecords = numConsumed + 1
       while (numConsumed < minRecords)
-          numConsumed += consumer.poll(100).count
+        numConsumed += consumer.poll(100).count
     }
     numConsumed
   }
 
-  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    TestUtils.retry(10000) {
-      val quotaManagers = leaderNode.apis.quotas
-      val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
-      val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
-      val overrideProducerRequestQuota = quotaManagers.request.quota(userPrincipal, producerClientId)
-      val overrideConsumerRequestQuota = quotaManagers.request.quota(userPrincipal, consumerClientId)
+  def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = {
+    verifyThrottleTimeMetric(QuotaType.Produce, producerClientId, expectThrottle)
+    if (verifyClientMetric)
+      verifyProducerClientThrottleTimeMetric(expectThrottle)
+  }
 
-      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
-      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
-      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
-      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
+  def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = {
+    verifyThrottleTimeMetric(QuotaType.Fetch, consumerClientId, expectThrottle)
+    if (verifyClientMetric)
+      verifyConsumerClientThrottleTimeMetric(expectThrottle)
+  }
+
+  def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = {
+    val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
+    if (expectThrottle) {
+      assertTrue("Should have been throttled", throttleMetricValue > 0)
+    } else {
+      assertEquals("Should not have been throttled", 0.0, throttleMetricValue, 0.0)
     }
   }
 
-  private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
+  def throttleMetricName(quotaType: QuotaType, clientId: String): MetricName = {
+    leaderNode.metrics.metricName("throttle-time",
+      quotaType.toString,
+      quotaMetricTags(clientId).asJava)
+  }
+
+  def throttleMetric(quotaType: QuotaType, clientId: String): KafkaMetric = {
+    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, clientId))
+  }
+
+  def exemptRequestMetric: KafkaMetric = {
+    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
+    leaderNode.metrics.metrics.get(metricName)
+  }
+
+  def verifyProducerClientThrottleTimeMetric(expectThrottle: Boolean) {
     val tags = new HashMap[String, String]
     tags.put("client-id", producerClientId)
     val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
     val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
 
-    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
-        s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+    if (expectThrottle) {
+      TestUtils.waitUntilTrue(() => metricValue(avgMetric) > 0.0 && metricValue(maxMetric) > 0.0,
+        s"Producer throttle metric not updated: avg=${metricValue(avgMetric)} max=${metricValue(maxMetric)}")
+    } else
+      assertEquals("Should not have been throttled", 0.0, metricValue(maxMetric), 0.0)
   }
 
-  private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) {
+  def verifyConsumerClientThrottleTimeMetric(expectThrottle: Boolean, maxThrottleTime: Option[Double] = None) {
     val tags = new HashMap[String, String]
     tags.put("client-id", consumerClientId)
     val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags))
     val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags))
 
-    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 0.0,
-        s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
-    maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}", maxMetric.value <= max))
-  }
-
-  private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
-    leaderNode.metrics.metricName("throttle-time",
-                                  quotaType.toString,
-                                  "Tracking throttle-time per user/client-id",
-                                  "user", quotaId.sanitizedUser.getOrElse(""),
-                                  "client-id", quotaId.clientId.getOrElse(""))
-  }
-
-  def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
-    leaderNode.metrics.metrics.get(throttleMetricName(quotaType, quotaId))
-  }
-
-  private def producerThrottleMetric = throttleMetric(QuotaType.Produce, producerQuotaId)
-  private def consumerThrottleMetric = throttleMetric(QuotaType.Fetch, consumerQuotaId)
-  private def consumerRequestThrottleMetric = throttleMetric(QuotaType.Request, consumerQuotaId)
-
-  private def exemptRequestMetric: KafkaMetric = {
-    val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "")
-    leaderNode.metrics.metrics.get(metricName)
+    if (expectThrottle) {
+      TestUtils.waitUntilTrue(() => metricValue(avgMetric) > 0.0 && metricValue(maxMetric) > 0.0,
+        s"Consumer throttle metric not updated: avg=${metricValue(avgMetric)} max=${metricValue(maxMetric)}")
+      maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${metricValue(maxMetric)}",
+        metricValue(maxMetric) <= max))
+    } else
+      assertEquals("Should not have been throttled", 0.0, metricValue(maxMetric), 0.0)
   }
 
   def quotaProperties(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Properties = {
@@ -257,4 +286,19 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     props.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
     props
   }
+
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) {
+    TestUtils.retry(10000) {
+      val quotaManagers = server.apis.quotas
+      val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
+      val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId)
+      val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId)
+      val overrideConsumerRequestQuota = quota(quotaManagers.request, userPrincipal, consumerClientId)
+
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideProducerRequestQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have request quota", Quota.upperBound(requestQuota), overrideConsumerRequestQuota)
+    }
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 3e08327..b084b3c 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,18 +16,15 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
+import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
 
 class ClientIdQuotaTest extends BaseQuotaTest {
 
-  override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override val producerQuotaId = QuotaId(None, Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
-  override val consumerQuotaId = QuotaId(None, Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -35,24 +32,35 @@ class ClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
     super.setUp()
   }
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val producerProps = new Properties()
-    producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
-    producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(producerClientId, producerProps)
-
-    val consumerProps = new Properties()
-    consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
-    consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(consumerClientId, consumerProps)
-  }
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    updateQuotaOverride(producerClientId, emptyProps)
-    updateQuotaOverride(consumerClientId, emptyProps)
-  }
 
-  private def updateQuotaOverride(clientId: String, properties: Properties) {
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override def userPrincipal: KafkaPrincipal = KafkaPrincipal.ANONYMOUS
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> "", "client-id" -> clientId)
+      }
+
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val producerProps = new Properties()
+        producerProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+        producerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(producerClientId, producerProps)
+
+        val consumerProps = new Properties()
+        consumerProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+        consumerProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(consumerClientId, consumerProps)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        updateQuotaOverride(producerClientId, emptyProps)
+        updateQuotaOverride(consumerClientId, emptyProps)
+      }
+
+      private def updateQuotaOverride(clientId: String, properties: Properties) {
+        adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
new file mode 100644
index 0000000..886d696
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -0,0 +1,453 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.io.File
+import java.{lang, util}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.{Collections, Properties}
+
+import kafka.api.GroupedUserPrincipalBuilder._
+import kafka.api.GroupedUserQuotaCallback._
+import kafka.server._
+import kafka.utils.JaasTestUtils.ScramLoginModule
+import kafka.utils.{JaasTestUtils, Logging, TestUtils}
+import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.{Cluster, Reconfigurable}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.SaslAuthenticationException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth._
+import org.apache.kafka.common.security.scram.ScramCredential
+import org.apache.kafka.server.quota._
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
+
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected def listenerName = new ListenerName("CLIENT")
+  override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
+
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  override val consumerCount: Int = 0
+  override val producerCount: Int = 0
+  override val serverCount: Int = 2
+
+  private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
+  private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
+  override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+  override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+  private val adminClients = new ArrayBuffer[AdminClient]()
+  private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
+
+  val defaultRequestQuota = 1000
+  val defaultProduceQuota = 2000 * 1000 * 1000
+  val defaultConsumeQuota = 1000 * 1000 * 1000
+
+  @Before
+  override def setUp() {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
+    this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
+    this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName)
+    this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}",
+      classOf[GroupedUserPrincipalBuilder].getName)
+    this.serverConfig.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+    super.setUp()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
+      ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
+    producerWithoutQuota = createNewProducer
+    producers += producerWithoutQuota
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    // Close producers and consumers without waiting for requests to complete
+    // to avoid waiting for throttled responses
+    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    producers.clear()
+    consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    consumers.clear()
+    super.tearDown()
+  }
+
+  override def configureSecurityBeforeServersStart() {
+    super.configureSecurityBeforeServersStart()
+    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
+  }
+
+  @Test
+  def testCustomQuotaCallback() {
+    // Large quota override, should not throttle
+    var brokerId = 0
+    var user = createGroupWithOneUser("group0_user1", brokerId)
+    user.configureAndWaitForQuota(1000000, 2000000)
+    quotaLimitCalls.values.foreach(_.set(0))
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once for each new client
+    assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get)
+    assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get)
+    assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get <= serverCount)
+    // Large quota updated to small quota, should throttle
+    user.configureAndWaitForQuota(9000, 3000)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Quota override deletion - verify default quota applied (large quota, no throttling)
+    user = addUser("group0_user2", brokerId)
+    user.removeQuotaOverrides()
+    user.waitForQuotaUpdate(defaultProduceQuota, defaultConsumeQuota, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Make default quota smaller, should throttle
+    user.configureAndWaitForQuota(8000, 2500, divisor = 1, group = None)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Configure large quota override, should not throttle
+    user = addUser("group0_user3", brokerId)
+    user.configureAndWaitForQuota(2000000, 2000000)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Quota large enough for one partition, should not throttle
+    brokerId = 1
+    user = createGroupWithOneUser("group1_user1", brokerId)
+    user.configureAndWaitForQuota(8000 * 100, 2500 * 100)
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Create large number of partitions on another broker, should result in throttling on first partition
+    val largeTopic = "group1_largeTopic"
+    createTopic(largeTopic, numPartitions = 99, leader = 0)
+    user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Remove quota override and test default quota applied with scaling based on partitions
+    user = addUser("group1_user2", brokerId)
+    user.waitForQuotaUpdate(defaultProduceQuota / 100, defaultConsumeQuota / 100, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+    user.configureAndWaitForQuota(8000 * 100, 2500 * 100, divisor=100, group = None)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+
+    // Remove the second topic with large number of partitions, verify no longer throttled
+    adminZkClient.deleteTopic(largeTopic)
+    user = addUser("group1_user3", brokerId)
+    user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota)
+    user.removeThrottleMetrics() // since group was throttled before
+    user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
+
+    // Alter configs of custom callback dynamically
+    val adminClient = createAdminClient()
+    val newProps = new Properties
+    newProps.put(GroupedUserQuotaCallback.DefaultProduceQuotaProp, "8000")
+    newProps.put(GroupedUserQuotaCallback.DefaultFetchQuotaProp, "2500")
+    TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig = false)
+    user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
+    user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
+  }
+
+  /**
+   * Creates a group with one user and one topic with one partition.
+   * @param firstUser First user to create in the group
+   * @param brokerId The broker id to use as leader of the partition
+   */
+  private def createGroupWithOneUser(firstUser: String, brokerId: Int): GroupedUser = {
+    val user = addUser(firstUser, brokerId)
+    createTopic(user.topic, numPartitions = 1, brokerId)
+    user.configureAndWaitForQuota(defaultProduceQuota, defaultConsumeQuota, divisor = 1, group = None)
+    user
+  }
+
+  private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
+    val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+  }
+
+  private def createAdminClient(): AdminClient = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+      TestUtils.bootstrapServers(servers, new ListenerName("BROKER")))
+    clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].asScala.foreach { case (key, value) =>
+      config.put(key.toString, value)
+    }
+    config.put(SaslConfigs.SASL_JAAS_CONFIG,
+      ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
+    val adminClient = AdminClient.create(config)
+    adminClients += adminClient
+    adminClient
+  }
+
+  private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = {
+    (0 until numRecords).foreach { i =>
+      val payload = i.toString.getBytes
+      producerWithoutQuota.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, null, null, payload))
+    }
+  }
+
+  private def addUser(user: String, leader: Int): GroupedUser = {
+
+    val password = s"$user:secret"
+    createScramCredentials(zkConnect, user, password)
+    servers.foreach { server =>
+      val cache = server.credentialProvider.credentialCache.cache(kafkaClientSaslMechanism, classOf[ScramCredential])
+      TestUtils.waitUntilTrue(() => cache.get(user) != null, "SCRAM credentials not created")
+    }
+
+    val userGroup = group(user)
+    val topic = s"${userGroup}_topic"
+    val producerClientId = s"$user:producer-client-id"
+    val consumerClientId = s"$user:producer-client-id"
+
+    producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+    producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
+    val producer = createNewProducer
+    producers += producer
+
+    consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
+    consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
+    val consumer = createNewConsumer
+    consumers += consumer
+
+    GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, producer, consumer)
+  }
+
+  case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaServer,
+                         producerClientId: String, consumerClientId: String,
+                         producer: KafkaProducer[Array[Byte], Array[Byte]],
+                         consumer: KafkaConsumer[Array[Byte], Array[Byte]]) extends
+    QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
+
+    override def userPrincipal: KafkaPrincipal = GroupedUserPrincipal(user, userGroup)
+
+    override def quotaMetricTags(clientId: String): Map[String, String] = {
+      Map(GroupedUserQuotaCallback.QuotaGroupTag -> userGroup)
+    }
+
+    override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Unit = {
+      configureQuota(userGroup, producerQuota, consumerQuota, requestQuota)
+    }
+
+    override def removeQuotaOverrides(): Unit = {
+      adminZkClient.changeUserOrUserClientIdConfig(quotaEntityName(userGroup), new Properties)
+    }
+
+    def configureQuota(userGroup: String, producerQuota: Long, consumerQuota: Long, requestQuota: Double): Unit = {
+      val quotaProps = quotaProperties(producerQuota, consumerQuota, requestQuota)
+      adminZkClient.changeUserOrUserClientIdConfig(quotaEntityName(userGroup), quotaProps)
+    }
+
+    def configureAndWaitForQuota(produceQuota: Long, fetchQuota: Long, divisor: Int = 1,
+                                 group: Option[String] = Some(userGroup)): Unit = {
+      configureQuota(group.getOrElse(""), produceQuota, fetchQuota, defaultRequestQuota)
+      waitForQuotaUpdate(produceQuota / divisor, fetchQuota / divisor, defaultRequestQuota)
+    }
+
+    def produceConsume(expectProduceThrottle: Boolean, expectConsumeThrottle: Boolean): Unit = {
+      val numRecords = 1000
+      val produced = produceUntilThrottled(numRecords, waitForRequestCompletion = false)
+      verifyProduceThrottle(expectProduceThrottle, verifyClientMetric = false)
+      // make sure there are enough records on the topic to test consumer throttling
+      produceWithoutThrottle(topic, numRecords - produced)
+      consumeUntilThrottled(numRecords, waitForRequestCompletion = false)
+      verifyConsumeThrottle(expectConsumeThrottle, verifyClientMetric = false)
+    }
+
+    def removeThrottleMetrics(): Unit = {
+      def removeSensors(quotaType: QuotaType, clientId: String): Unit = {
+        val sensorSuffix = quotaMetricTags(clientId).values.mkString(":")
+        leaderNode.metrics.removeSensor(s"${quotaType}ThrottleTime-$sensorSuffix")
+        leaderNode.metrics.removeSensor(s"$quotaType-$sensorSuffix")
+      }
+      removeSensors(QuotaType.Produce, producerClientId)
+      removeSensors(QuotaType.Fetch, consumerClientId)
+      removeSensors(QuotaType.Request, producerClientId)
+      removeSensors(QuotaType.Request, consumerClientId)
+    }
+
+    private def quotaEntityName(userGroup: String): String = s"${userGroup}_"
+  }
+}
+
+object GroupedUserPrincipalBuilder {
+  def group(str: String): String = {
+    if (str.indexOf("_") <= 0)
+      ""
+    else
+      str.substring(0, str.indexOf("_"))
+  }
+}
+
+class GroupedUserPrincipalBuilder extends KafkaPrincipalBuilder {
+  override def build(context: AuthenticationContext): KafkaPrincipal = {
+    val securityProtocol = context.securityProtocol
+    if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
+      val user = context.asInstanceOf[SaslAuthenticationContext].server().getAuthorizationID
+      val userGroup = group(user)
+      if (userGroup.isEmpty)
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+      else
+        GroupedUserPrincipal(user, userGroup)
+    } else
+      throw new IllegalStateException(s"Unexpected security protocol $securityProtocol")
+  }
+}
+
+case class GroupedUserPrincipal(user: String, userGroup: String) extends KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+
+object GroupedUserQuotaCallback {
+  val QuotaGroupTag = "group"
+  val DefaultProduceQuotaProp = "default.produce.quota"
+  val DefaultFetchQuotaProp = "default.fetch.quota"
+  val UnlimitedQuotaMetricTags = Collections.emptyMap[String, String]
+  val quotaLimitCalls = Map(
+    ClientQuotaType.PRODUCE -> new AtomicInteger,
+    ClientQuotaType.FETCH -> new AtomicInteger,
+    ClientQuotaType.REQUEST -> new AtomicInteger
+  )
+}
+
+/**
+ * Quota callback for a grouped user. Both user principals and topics of each group
+ * are prefixed with the group name followed by '_'. This callback defines quotas of different
+ * types at the group level. Group quotas are configured in ZooKeeper as user quotas with
+ * the entity name "${group}_". Default group quotas are configured in ZooKeeper as user quotas
+ * with the entity name "_".
+ *
+ * Default group quotas may also be configured using the configuration options
+ * "default.produce.quota" and "default.fetch.quota" which can be reconfigured dynamically
+ * without restarting the broker. This tests custom reconfigurable options for quota callbacks,
+ */
+class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable with Logging {
+
+  var brokerId: Int = -1
+  val customQuotasUpdated = ClientQuotaType.values.toList
+    .map(quotaType =>(quotaType -> new AtomicBoolean)).toMap
+  val quotas = ClientQuotaType.values.toList
+    .map(quotaType => (quotaType -> new ConcurrentHashMap[String, Double])).toMap
+
+  val partitionRatio = new ConcurrentHashMap[String, Double]()
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    brokerId = configs.get(KafkaConfig.BrokerIdProp).toString.toInt
+  }
+
+  override def reconfigurableConfigs: util.Set[String] = {
+    Set(DefaultProduceQuotaProp, DefaultFetchQuotaProp).asJava
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    reconfigurableConfigs.asScala.foreach(configValue(configs, _))
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    configValue(configs, DefaultProduceQuotaProp).foreach(value => quotas(ClientQuotaType.PRODUCE).put("", value))
+    configValue(configs, DefaultFetchQuotaProp).foreach(value => quotas(ClientQuotaType.FETCH).put("", value))
+    customQuotasUpdated.values.foreach(_.set(true))
+  }
+
+  private def configValue(configs: util.Map[String, _], key: String): Option[Long] = {
+    val value = configs.get(key)
+    if (value != null) Some(value.toString.toLong) else None
+  }
+
+  override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = {
+    principal match {
+      case groupPrincipal: GroupedUserPrincipal =>
+        val userGroup = groupPrincipal.userGroup
+        val quotaLimit = quotaOrDefault(userGroup, quotaType)
+        if (quotaLimit != null)
+          Map(QuotaGroupTag -> userGroup).asJava
+        else
+          UnlimitedQuotaMetricTags
+      case _ =>
+        UnlimitedQuotaMetricTags
+    }
+  }
+
+  override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = {
+    quotaLimitCalls(quotaType).incrementAndGet
+    val group = metricTags.get(QuotaGroupTag)
+    if (group != null) quotaOrDefault(group, quotaType) else null
+  }
+
+  override def updateClusterMetadata(cluster: Cluster): Boolean = {
+    val topicsByGroup = cluster.topics.asScala.groupBy(group)
+
+    !topicsByGroup.forall { case (group, groupTopics) =>
+      val groupPartitions = groupTopics.flatMap(topic => cluster.partitionsForTopic(topic).asScala)
+      val totalPartitions = groupPartitions.size
+      val partitionsOnThisBroker = groupPartitions.count { p => p.leader != null && p.leader.id == brokerId }
+      val multiplier = if (totalPartitions == 0)
+        1
+      else if (partitionsOnThisBroker == 0)
+        1.0 / totalPartitions
+      else
+        partitionsOnThisBroker.toDouble / totalPartitions
+      partitionRatio.put(group, multiplier) != multiplier
+    }
+  }
+
+  override def updateQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity, newValue: Double): Unit = {
+    quotas(quotaType).put(userGroup(quotaEntity), newValue)
+  }
+
+  override def removeQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity): Unit = {
+    quotas(quotaType).remove(userGroup(quotaEntity))
+  }
+
+  override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = customQuotasUpdated(quotaType).getAndSet(false)
+
+  def close(): Unit = {}
+
+  private def userGroup(quotaEntity: ClientQuotaEntity): String = {
+    val configEntity = quotaEntity.configEntities.get(0)
+    if (configEntity.entityType == ClientQuotaEntity.ConfigEntityType.USER)
+      group(configEntity.name)
+    else
+      throw new IllegalArgumentException(s"Config entity type ${configEntity.entityType} is not supported")
+  }
+
+  private def quotaOrDefault(group: String, quotaType: ClientQuotaType): lang.Double = {
+    val quotaMap = quotas(quotaType)
+    var quotaLimit: Any = quotaMap.get(group)
+    if (quotaLimit == null)
+      quotaLimit = quotaMap.get("")
+    if (quotaLimit != null) scaledQuota(quotaType, group, quotaLimit.asInstanceOf[Double]) else null
+  }
+
+  private def scaledQuota(quotaType: ClientQuotaType, group: String, configuredQuota: Double): Double = {
+    if (quotaType == ClientQuotaType.REQUEST)
+      configuredQuota
+    else {
+      val multiplier = partitionRatio.get(group)
+      if (multiplier <= 0.0) configuredQuota else configuredQuota * multiplier
+    }
+  }
+}
+
+
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 453ac91..47c8f5f 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -18,7 +18,7 @@ import java.io.File
 import java.util.Properties
 
 import kafka.server._
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
 
@@ -27,11 +27,8 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   override protected def securityProtocol = SecurityProtocol.SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
-  override val userPrincipal = "O=A client,CN=localhost"
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
-  override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -39,30 +36,41 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    val defaultProps = quotaTestClients.quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val producerProps = new Properties()
-    producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
-    producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(userPrincipal, producerClientId, producerProps)
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override def userPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "O=A client,CN=localhost")
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> Sanitizer.sanitize(userPrincipal.getName), "client-id" -> clientId)
+      }
 
-    val consumerProps = new Properties()
-    consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
-    consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
-    updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
-  }
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val producerProps = new Properties()
+        producerProps.setProperty(DynamicConfig.Client.ProducerByteRateOverrideProp, producerQuota.toString)
+        producerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(userPrincipal.getName, producerClientId, producerProps)
 
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
-  }
+        val consumerProps = new Properties()
+        consumerProps.setProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp, consumerQuota.toString)
+        consumerProps.setProperty(DynamicConfig.Client.RequestPercentageOverrideProp, requestQuota.toString)
+        updateQuotaOverride(userPrincipal.getName, consumerClientId, consumerProps)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName) +
+          "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName) +
+          "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+      }
 
-  private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+      private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 91a92fa..3386c91 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,9 +17,9 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
+import kafka.server.{ConfigEntityName, KafkaConfig, KafkaServer}
 import kafka.utils.JaasTestUtils
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.junit.{After, Before}
 
@@ -32,20 +32,15 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
-  override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
-  override val producerQuotaId = QuotaId(Some(userPrincipal), None, None)
-  override val consumerQuotaId = QuotaId(Some(userPrincipal), None, None)
-
-
   @Before
   override def setUp() {
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
     this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
-    val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    val defaultProps = quotaTestClients.quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
     adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
-    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
+    quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
   @After
@@ -54,18 +49,27 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
     closeSasl()
   }
 
-  override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
-    val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
-    updateQuotaOverride(props)
-  }
+  override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+      override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
+      override def quotaMetricTags(clientId: String): Map[String, String] = {
+        Map("user" -> userPrincipal.getName, "client-id" -> "")
+      }
 
-  override def removeQuotaOverrides() {
-    val emptyProps = new Properties
-    updateQuotaOverride(emptyProps)
-    updateQuotaOverride(emptyProps)
-  }
+      override def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double) {
+        val props = quotaProperties(producerQuota, consumerQuota, requestQuota)
+        updateQuotaOverride(props)
+      }
+
+      override def removeQuotaOverrides() {
+        val emptyProps = new Properties
+        updateQuotaOverride(emptyProps)
+        updateQuotaOverride(emptyProps)
+      }
 
-  private def updateQuotaOverride(properties: Properties) {
-    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
+      private def updateQuotaOverride(properties: Properties) {
+        adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal.getName), properties)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e0ab55c..bb62fb7 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -746,7 +746,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val unknownConfig = "some.config"
     props.put(unknownConfig, "some.config.value")
 
-    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
       "Listener config not updated")
@@ -799,7 +799,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     listenerProps.foreach(props.remove)
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
-    alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
@@ -1054,20 +1054,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
   }
 
-  private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
-    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
-    val newConfig = new Config(configEntries)
-    val configs = if (perBrokerConfig) {
-      servers.map { server =>
-        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
-        (resource, newConfig)
-      }.toMap.asJava
-    } else {
-      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
-    }
-    adminClient.alterConfigs(configs)
-  }
-
   private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1077,7 +1063,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   }
 
   private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
-    val alterResult = alterConfigs(adminClients.head, newProps, perBrokerConfig)
+    val alterResult = TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig)
     if (expectFailure) {
       val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey)
       val brokerResources = if (perBrokerConfig)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 1aabbb3..c0bad91 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,7 +18,10 @@ package kafka.server
 
 import java.util.Collections
 
+import kafka.network.RequestChannel.Session
+import kafka.server.QuotaType._
 import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{MockTime, Sanitizer}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
@@ -38,43 +41,49 @@ class ClientQuotaManagerTest {
     numCallbacks = 0
   }
 
+  private def maybeRecordAndThrottle(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
+    quotaManager.maybeRecordAndThrottle(Session(principal, null),clientId, value, this.callback)
+  }
+
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
-    val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, newMetrics, Produce, time, "")
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true)))
       clientMetrics.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true)))
 
-      assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, true), clientMetrics.quota(randomClient.user, randomClient.clientId))
-      assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId))
-      assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client2.user, client2.clientId))
+      assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault,
+        config.quotaBytesPerSecondDefault, clientMetrics.quota(randomClient.user, randomClient.clientId).bound, 0.0)
+      assertEquals("Should return the overridden value (2000)", 2000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
+      assertEquals("Should return the overridden value (4000)", 4000, clientMetrics.quota(client2.user, client2.clientId).bound, 0.0)
 
       // p1 should be throttled using the overridden quota
-      var throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 2500 * config.numQuotaSamples, this.callback)
+      var throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 2500 * config.numQuotaSamples)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
       // p1 should not longer be throttled after the quota change
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true)))
-      assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the newly overridden value (3000)", 3000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true)))
-      assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the default value (500)", 500, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 0)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
 
       // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, None)
       clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true)))
-      assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the newly overridden value (4000)", 4000, clientMetrics.quota(client1.user, client1.clientId).bound, 0.0)
 
-      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback)
+      throttleTimeMs = maybeRecordAndThrottle(clientMetrics, client1.user, client1.clientId, 1000 * config.numQuotaSamples)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
     } finally {
@@ -150,11 +159,11 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaConfigPrecedence() {
     val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
-        newMetrics, QuotaType.Produce, time, "")
+        newMetrics, Produce, time, "")
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
-      assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
-      val throttleTimeMs = quotaManager.maybeRecordAndThrottle(user, clientId, value * config.numQuotaSamples, this.callback)
+      assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+      val throttleTimeMs = maybeRecordAndThrottle(quotaManager, user, clientId, value * config.numQuotaSamples)
       if (expectThrottle)
         assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
       else
@@ -223,14 +232,14 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaViolation() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
        */
       for (_ <- 0 until 10) {
-        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
+        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -241,7 +250,7 @@ class ClientQuotaManagerTest {
       // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
       // 10.5 seconds because the last window is half complete
       time.sleep(500)
-      val sleepTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 2300, callback)
+      val sleepTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 2300)
 
       assertEquals("Should be throttled", 2100, sleepTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -257,12 +266,12 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 10) {
-        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
+        maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 400)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 0, callback))
+                   0, maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "unknown", 0))
     } finally {
       clientMetrics.shutdown()
     }
@@ -271,7 +280,7 @@ class ClientQuotaManagerTest {
   @Test
   def testRequestPercentageQuotaViolation() {
     val metrics = newMetrics
-    val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "")
+    val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "", None)
     quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
     def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
@@ -280,7 +289,7 @@ class ClientQuotaManagerTest {
        * if we are under the quota
        */
       for (_ <- 0 until 10) {
-        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -292,7 +301,7 @@ class ClientQuotaManagerTest {
       // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
       // 10.5 seconds interval because the last window is half complete
       time.sleep(500)
-      val throttleTime = quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(67.1), callback)
+      val throttleTime = maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(67.1))
 
       assertEquals("Should be throttled", 210, throttleTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -308,22 +317,22 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 11) {
-        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4), callback)
+        maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(4))
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
 
       // Create a very large spike which requires > one quota window to bring within quota
-      assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(500), callback))
+      assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", millisToPercent(500)))
       for (_ <- 0 until 10) {
         time.sleep(1000)
-        assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+        assertEquals(1000, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
       }
       time.sleep(1000)
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0, callback))
+                   0, maybeRecordAndThrottle(quotaManager, "ANONYMOUS", "test-client", 0))
 
     } finally {
       quotaManager.shutdown()
@@ -333,13 +342,13 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireThrottleTimeSensor() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove the throttle time sensor
       metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does not exist.
-      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
+      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
@@ -352,14 +361,14 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireQuotaSensors() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 100)
       // remove all the sensors
       metrics.removeSensor("ProduceThrottleTime-:client1")
       metrics.removeSensor("Produce-ANONYMOUS:client1")
       // should not throw an exception
-      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000, callback)
+      val throttleTime = maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", "client1", 10000)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated
@@ -376,10 +385,10 @@ class ClientQuotaManagerTest {
   @Test
   def testClientIdNotSanitized() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
+    val clientMetrics = new ClientQuotaManager(config, metrics, Produce, time, "")
     val clientId = "client@#$%"
     try {
-      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback)
+      maybeRecordAndThrottle(clientMetrics, "ANONYMOUS", clientId, 100)
 
       // The metrics should use the raw client ID, even if the reporters internally sanitize them
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + clientId)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 5c88bf2..9c8acb4 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -27,11 +27,12 @@ import org.apache.kafka.common.config.{ConfigException, SslConfigs}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
+import org.scalatest.junit.JUnitSuite
 
 import scala.collection.JavaConverters._
 import scala.collection.Set
 
-class DynamicBrokerConfigTest {
+class DynamicBrokerConfigTest extends JUnitSuite {
 
   @Test
   def testConfigUpdate(): Unit = {
@@ -127,6 +128,35 @@ class DynamicBrokerConfigTest {
   }
 
   @Test
+  def testReconfigurableValidation(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val config = KafkaConfig(origProps)
+    val invalidReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.BrokerIdProp, "some.prop")
+    val validReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.LogCleanerDedupeBufferSizeProp, "some.prop")
+
+    def createReconfigurable(configs: Set[String]) = new Reconfigurable {
+      override def configure(configs: util.Map[String, _]): Unit = {}
+      override def reconfigurableConfigs(): util.Set[String] = configs.asJava
+      override def validateReconfiguration(configs: util.Map[String, _]): Unit = {}
+      override def reconfigure(configs: util.Map[String, _]): Unit = {}
+    }
+    intercept[IllegalArgumentException] {
+      config.dynamicConfig.addReconfigurable(createReconfigurable(invalidReconfigurableProps))
+    }
+    config.dynamicConfig.addReconfigurable(createReconfigurable(validReconfigurableProps))
+
+    def createBrokerReconfigurable(configs: Set[String]) = new BrokerReconfigurable {
+      override def reconfigurableConfigs: collection.Set[String] = configs
+      override def validateReconfiguration(newConfig: KafkaConfig): Unit = {}
+      override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
+    }
+    intercept[IllegalArgumentException] {
+      config.dynamicConfig.addBrokerReconfigurable(createBrokerReconfigurable(invalidReconfigurableProps))
+    }
+    config.dynamicConfig.addBrokerReconfigurable(createBrokerReconfigurable(validReconfigurableProps))
+  }
+
+  @Test
   def testSecurityConfigs(): Unit = {
     def verifyUpdate(name: String, value: Object): Unit = {
       verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = true)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bfbae2b..2a7d6d4 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -23,6 +23,7 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
+
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
@@ -132,13 +133,16 @@ class RequestQuotaTest extends BaseRequestTest {
     waitAndCheckResults()
   }
 
+  def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
+
   private def throttleTimeMetricValue(clientId: String): Double = {
     val metricName = leaderNode.metrics.metricName("throttle-time",
                                   QuotaType.Request.toString,
                                   "",
                                   "user", "",
                                   "client-id", clientId)
-    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).throttleTimeSensor
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
+      clientId).throttleTimeSensor
     metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
   }
 
@@ -148,7 +152,8 @@ class RequestQuotaTest extends BaseRequestTest {
                                   "",
                                   "user", "",
                                   "client-id", clientId)
-    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors("ANONYMOUS", clientId).quotaSensor
+    val sensor = leaderNode.quotaManagers.request.getOrCreateQuotaSensors(session("ANONYMOUS"),
+      clientId).quotaSensor
     metricValue(leaderNode.metrics.metrics.get(metricName), sensor)
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4b87406..16b7e87 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -41,9 +41,11 @@ import Implicits._
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -1492,6 +1494,21 @@ object TestUtils extends Logging {
     }
   }
 
+  def alterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
+                   perBrokerConfig: Boolean): AlterConfigsResult = {
+    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
+    val newConfig = new Config(configEntries)
+    val configs = if (perBrokerConfig) {
+      servers.map { server =>
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+        (resource, newConfig)
+      }.toMap.asJava
+    } else {
+      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
+    }
+    adminClient.alterConfigs(configs)
+  }
+
   /**
    * Capture the console output during the execution of the provided function.
    */

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message