kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-3492; Secure quotas for authenticated users
Date Sat, 17 Sep 2016 17:06:11 GMT
KAFKA-3492; Secure quotas for authenticated users

Implementation and tests for secure quotas at <user> and <user, client-id> levels as described in KIP-55. Also adds dynamic default quotas for <client-id>, <user> and <user-client-id>. For each client connection, the most specific quota matching the connection is used, with user quota taking precedence over client-id quota.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1753 from rajinisivaram/KAFKA-3492


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69356fbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69356fbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69356fbc

Branch: refs/heads/trunk
Commit: 69356fbc6e76ab4291ff4957f0d6ea04e7245909
Parents: ecc1fb1
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Sat Sep 17 10:06:05 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sat Sep 17 10:06:05 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/metrics/KafkaMetric.java       |   2 +-
 .../org/apache/kafka/common/metrics/Quota.java  |   5 +
 .../src/main/scala/kafka/admin/AdminUtils.scala |  72 +++-
 .../main/scala/kafka/admin/ConfigCommand.scala  | 192 +++++++++--
 .../scala/kafka/network/RequestChannel.scala    |   5 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 332 ++++++++++++++++---
 .../main/scala/kafka/server/ConfigHandler.scala |  66 ++--
 .../kafka/server/DynamicConfigManager.scala     |  94 ++++--
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   6 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   7 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +
 .../integration/kafka/api/BaseQuotaTest.scala   | 195 +++++++++++
 .../kafka/api/ClientIdQuotaTest.scala           |  55 +++
 .../kafka/api/ClientQuotasTest.scala            | 206 ------------
 .../kafka/api/UserClientIdQuotaTest.scala       |  66 ++++
 .../integration/kafka/api/UserQuotaTest.scala   |  61 ++++
 .../test/scala/unit/kafka/admin/AdminTest.scala |   4 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    | 186 ++++++++++-
 .../scala/unit/kafka/admin/TestAdminUtils.scala |   1 +
 .../kafka/server/ClientQuotaManagerTest.scala   | 217 ++++++++++--
 .../kafka/server/DynamicConfigChangeTest.scala  |  77 +++--
 22 files changed, 1449 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index e4d3ae8..86014e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -37,7 +37,7 @@ public final class KafkaMetric implements Metric {
         this.time = time;
     }
 
-    MetricConfig config() {
+    public MetricConfig config() {
         return this.config;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
index 8431e50..663b963 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
@@ -67,4 +67,9 @@ public final class Quota {
         Quota that = (Quota) obj;
         return (that.bound == this.bound) && (that.upper == this.upper);
     }
+
+    @Override
+    public String toString() {
+        return (upper ? "upper=" : "lower=") + bound;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 400cc47..b3f8e5c 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -41,6 +41,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 trait AdminUtilities {
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
   def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
   def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
 }
@@ -449,7 +450,7 @@ object AdminUtils extends Logging with AdminUtilities {
     if (!update) {
       // write out the config if there is any, this isn't transactional with the partition assignments
       LogConfig.validate(config)
-      writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
+      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
     }
 
     // create the partition assignment
@@ -476,7 +477,9 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   /**
-   * Update the config for a client and create a change notification so the change will propagate to other brokers
+   * Update the config for a client and create a change notification so the change will propagate to other brokers.
+   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+   * and <user> configs are not specified.
    *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param clientId: The clientId for which configs are being changed
@@ -489,6 +492,21 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   /**
+   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+   * value to be applied if a more specific override is not configured.
+   *
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
+   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
+    changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
+  }
+
+  /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
    *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
@@ -520,37 +538,41 @@ object AdminUtils extends Logging with AdminUtilities {
     }
   }
 
-  private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
+  private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+    val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
     // write the new config--may not exist if there were previously no overrides
-    writeEntityConfig(zkUtils, entityType, entityName, configs)
+    writeEntityConfig(zkUtils, entityConfigPath, configs)
 
     // create the change notification
     val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
-    val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
+    val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
     zkUtils.zkClient.createPersistentSequential(seqNode, content)
   }
 
-  def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
-    Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
+  def getConfigChangeZnodeData(sanitizedEntityPath: String) : Map[String, Any] = {
+    Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
   }
 
   /**
-   * Write out the topic config to zk, if there is any
+   * Write out the entity config to zk, if there is any
    */
-  private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) {
+  private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
     val configMap: mutable.Map[String, String] = {
       import JavaConversions._
       config
     }
     val map = Map("version" -> 1, "config" -> configMap)
-    zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
+    zkUtils.updatePersistentPath(entityPath, Json.encode(map))
   }
 
   /**
-   * Read the entity (topic or client) config (if any) from zk
+   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
    */
-  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
-    val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
+  def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
+    val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
+    val str: String = zkUtils.zkClient.readData(entityConfigPath, true)
     val props = new Properties()
     if (str != null) {
       Json.parseFull(str) match {
@@ -564,13 +586,12 @@ object AdminUtils extends Logging with AdminUtilities {
                 configTup match {
                   case (k: String, v: String) =>
                     props.setProperty(k, v)
-                  case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
+                  case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
                 }
-            case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
+            case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
           }
 
-        case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
-                                                             .format(str, entityType, entity))
+        case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
       }
     }
     props
@@ -582,6 +603,23 @@ object AdminUtils extends Logging with AdminUtilities {
   def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
     zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
 
+  def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+    def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
+      val root = rootPath match {
+        case Some(path) => rootEntityType + '/' + rootPath
+        case None => rootEntityType
+      }
+      val entityNames = zkUtils.getAllEntitiesWithConfig(root)
+      rootPath match {
+        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+        case None => entityNames
+      }
+    }
+    entityPaths(zkUtils, None)
+      .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType)))
+      .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap
+  }
+
   def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ebf9e61..58bdb7a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -21,8 +21,9 @@ import java.util.Properties
 
 import joptsimple._
 import kafka.admin.TopicCommand._
+import kafka.common.Config
 import kafka.log.{Defaults, LogConfig}
-import kafka.server.{KafkaConfig, ClientConfigOverride, ConfigType}
+import kafka.server.{KafkaConfig, QuotaConfigOverride, ConfigType, ConfigEntityName, QuotaId}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
@@ -33,15 +34,26 @@ import scala.collection._
 
 /**
  * This script can be used to change configs for topics/clients/brokers dynamically
+ * This script can be used to change configs for topics/clients/users/brokers dynamically
+ * An entity described or altered by the command may be one of:
+ * <ul>
+ *     <li> topic: --entity-type topics --entity-name <topic>
+ *     <li> client: --entity-type clients --entity-name <client-id>
+ *     <li> user: --entity-type users --entity-name <user-principal>
+ *     <li> <user, client>: --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id>
+ *     <li> broker: --entity-type brokers --entity-name <broker>
+ * </ul>
+ * --entity-default may be used instead of --entity-name when describing or altering default configuration for users and clients.
+ *
  */
-object ConfigCommand {
+object ConfigCommand extends Config {
 
   def main(args: Array[String]): Unit = {
 
     val opts = new ConfigCommandOptions(args)
 
     if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client or broker")
+      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client, user or broker")
 
     opts.checkArgs()
 
@@ -57,18 +69,19 @@ object ConfigCommand {
         describeConfig(zkUtils, opts)
     } catch {
       case e: Throwable =>
-        println("Error while executing topic command " + e.getMessage)
+        println("Error while executing config command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
       zkUtils.close()
     }
   }
 
-  def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
+  private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
-    val entityType = opts.options.valueOf(opts.entityType)
-    val entityName = opts.options.valueOf(opts.entityName)
+    val entity = parseEntity(opts)
+    val entityType = entity.root.entityType
+    val entityName = entity.fullSanitizedName
     warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
@@ -77,12 +90,13 @@ object ConfigCommand {
     configsToBeDeleted.foreach(config => configs.remove(config))
 
     entityType match {
-      case ConfigType.Topic =>  utils.changeTopicConfig(zkUtils, entityName, configs)
-      case ConfigType.Client =>  utils.changeClientIdConfig(zkUtils, entityName, configs)
+      case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs)
+      case ConfigType.Client => utils.changeClientIdConfig(zkUtils, entityName, configs)
+      case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
       case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
       case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
     }
-    println(s"Updated config for EntityType:$entityType => EntityName:'$entityName'.")
+    println(s"Updated config for entity: $entity.")
   }
 
   def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
@@ -107,17 +121,16 @@ object ConfigCommand {
   }
 
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
-    val entityType = opts.options.valueOf(opts.entityType)
-    val entityNames: Seq[String] =
-      if (opts.options.has(opts.entityName))
-        Seq(opts.options.valueOf(opts.entityName))
-      else
-        zkUtils.getAllEntitiesWithConfig(entityType)
-
-    for (entityName <- entityNames) {
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
-      println("Configs for %s:%s are %s"
-        .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+    val configEntity = parseEntity(opts)
+    val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
+    val entities = configEntity.getAllEntities(zkUtils)
+    for (entity <- entities) {
+      val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName)
+      // When describing all users, don't include empty user nodes with only <user, client> quota overrides.
+      if (!configs.isEmpty || !describeAllUsers) {
+        println("Configs for %s are %s"
+          .format(entity, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+      }
     }
   }
 
@@ -150,6 +163,115 @@ object ConfigCommand {
       Seq.empty
   }
 
+  case class Entity(entityType: String, sanitizedName: Option[String]) {
+    val entityPath = sanitizedName match {
+      case Some(n) => entityType + "/" + n
+      case None => entityType
+    }
+    override def toString: String = {
+      val typeName = entityType match {
+        case ConfigType.User => "user-principal"
+        case ConfigType.Client => "client-id"
+        case ConfigType.Topic => "topic"
+        case t => t
+      }
+      sanitizedName match {
+        case Some(ConfigEntityName.Default) => "default " + typeName
+        case Some(n) =>
+          val desanitized = if (entityType == ConfigType.User) QuotaId.desanitize(n) else n
+          s"$typeName '$desanitized'"
+        case None => entityType
+      }
+    }
+  }
+
+  case class ConfigEntity(root: Entity, child: Option[Entity]) {
+    val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("")
+
+    def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+      // Describe option examples:
+      //   Describe entity with specified name:
+      //     --entity-type topics --entity-name topic1 (topic1)
+      //   Describe all entities of a type (topics/brokers/users/clients):
+      //     --entity-type topics (all topics)
+      //   Describe <user, client> quotas:
+      //     --entity-type users --entity-name user1 --entity-type clients --entity-name client2 (<user1, client2>)
+      //     --entity-type users --entity-name userA --entity-type clients (all clients of userA)
+      //     --entity-type users --entity-type clients (all <user, client>s))
+      //   Describe default quotas:
+      //     --entity-type users --entity-default (Default user)
+      //     --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>)
+      (root.sanitizedName, child) match {
+        case (None, _) =>
+          val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+                                   .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
+          child match {
+            case Some (s) =>
+                rootEntities.flatMap(rootEntity =>
+                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
+            case None => rootEntities
+          }
+        case (rootName, Some(childEntity)) =>
+          childEntity.sanitizedName match {
+            case Some(subName) => Seq(this)
+            case None =>
+                zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
+                       .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
+
+          }
+        case (rootName, None) =>
+          Seq(this)
+      }
+    }
+
+    override def toString: String = {
+      root.toString + child.map(s => ", " + s.toString).getOrElse("")
+    }
+  }
+
+  private[admin] def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
+    val entityTypes = opts.options.valuesOf(opts.entityType)
+    if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client)
+      parseQuotaEntity(opts)
+    else {
+      // Exactly one entity type and at-most one entity name expected for other entities
+      val name = if (opts.options.has(opts.entityName)) Some(opts.options.valueOf(opts.entityName)) else None
+      ConfigEntity(Entity(entityTypes.head, name), None)
+    }
+  }
+
+  private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
+    val types = opts.options.valuesOf(opts.entityType)
+    val namesIterator = opts.options.valuesOf(opts.entityName).iterator
+    val names = opts.options.specs
+                    .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
+                    .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
+
+    if (opts.options.has(opts.alterOpt) && names.size != types.size)
+      throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
+
+    val reverse = types.size == 2 && types(0) == ConfigType.Client
+    val entityTypes = if (reverse) types.reverse else types.toBuffer
+    val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator
+
+    def sanitizeName(entityType: String, name: String) = {
+      if (name.isEmpty)
+        ConfigEntityName.Default
+      else {
+        entityType match {
+          case ConfigType.User => QuotaId.sanitize(name)
+          case ConfigType.Client =>
+            validateChars("Client-id", name)
+            name
+          case _ => throw new IllegalArgumentException("Invalid entity type " + entityType)
+        }
+      }
+    }
+
+    val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None))
+    ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
+  }
+
   class ConfigCommandOptions(args: Array[String]) {
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
@@ -159,19 +281,23 @@ object ConfigCommand {
             .ofType(classOf[String])
     val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
     val describeOpt = parser.accepts("describe", "List configs for the given entity.")
-    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/brokers)")
+    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)")
             .withRequiredArg
             .ofType(classOf[String])
-    val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/broker id)")
+    val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)")
             .withRequiredArg
             .ofType(classOf[String])
+    val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users (applies to corresponding entity type in command line)")
 
     val nl = System.getProperty("line.separator")
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
             "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
             "For entity_type '" + ConfigType.Broker + "': " + nl + KafkaConfig.dynamicBrokerConfigs.map("\t" + _).mkString(nl) + nl +
-            "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride
-                                                            + nl + "\t" + ClientConfigOverride.ConsumerOverride)
+            "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride
+                                                            + nl + "\t" + QuotaConfigOverride.ConsumerOverride + nl +
+            "For entity_type '" + ConfigType.User + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride
+                                                          + nl + "\t" + QuotaConfigOverride.ConsumerOverride + nl +
+            s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
             .withRequiredArg
             .ofType(classOf[String])
     val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
@@ -194,15 +320,27 @@ object ConfigCommand {
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
+      val entityTypeVals = options.valuesOf(entityType)
       if(options.has(alterOpt)) {
-        require(options.has(entityName), "--entity-name must be specified with --alter")
+        if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) {
+          if (!options.has(entityName) && !options.has(entityDefault))
+            throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users/clients")
+        } else if (!options.has(entityName))
+            throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}")
 
         val isAddConfigPresent: Boolean = options.has(addConfig)
         val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
         if(! isAddConfigPresent && ! isDeleteConfigPresent)
           throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
       }
-      require(ConfigType.all.contains(options.valueOf(entityType)), s"--entity-type must be one of ${ConfigType.all}")
+      entityTypeVals.foreach(entityTypeVal =>
+        if (!ConfigType.all.contains(entityTypeVal))
+          throw new IllegalArgumentException(s"Invalid entity-type ${entityTypeVal}, --entity-type must be one of ${ConfigType.all}")
+      )
+      if (entityTypeVals.isEmpty)
+        throw new IllegalArgumentException("At least one --entity-type must be specified")
+      else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
+        throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index cff7b1a..8aec2d2 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,6 +25,7 @@ import java.util.concurrent._
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.QuotaId
 import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
@@ -44,7 +45,9 @@ object RequestChannel extends Logging {
     RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
   }
 
-  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress)
+  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
+    val sanitizedUser = QuotaId.sanitize(principal.getName)
+  }
 
   case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers are in the request

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index e6cac5d..c4472c6 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,15 +16,19 @@
  */
 package kafka.server
 
+import java.net.{URLEncoder, URLDecoder}
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import kafka.utils.{ShutdownableThread, Logging}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConversions._
+
 /**
  * Represents the sensors aggregated per client
  * @param quotaSensor @Sensor that tracks the quota
@@ -34,7 +38,8 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor
 
 /**
  * Configuration settings for quota management
- * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
+ * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client-id if
+ *        dynamic defaults or user quotas are not set
  * @param numQuotaSamples The number of samples to retain in memory
  * @param quotaWindowSizeSeconds The time span of each sample
  *
@@ -53,11 +58,72 @@ object ClientQuotaManagerConfig {
   val DefaultQuotaWindowSizeSeconds = 1
   // Purge sensors after 1 hour of inactivity
   val InactiveSensorExpirationTimeSeconds  = 3600
+
+  val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
+  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
+  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None)
+  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+}
+
+object QuotaTypes {
+  val NoQuotas = 0
+  val ClientIdQuotaEnabled = 1
+  val UserQuotaEnabled = 2
+  val UserClientIdQuotaEnabled = 4
 }
 
+object QuotaId {
+
+  /**
+   * Sanitizes user principal to a safe value for use in MetricName
+   * and as Zookeeper node name
+   */
+  def sanitize(user: String): String = {
+    val encoded = URLEncoder.encode(user, StandardCharsets.UTF_8.name)
+    val builder = new StringBuilder
+    for (i <- 0 until encoded.length) {
+      encoded.charAt(i) match {
+        case '*' => builder.append("%2A") // Metric ObjectName treats * as pattern
+        case '+' => builder.append("%20") // Space URL-encoded as +, replace with percent encoding
+        case c => builder.append(c)
+      }
+    }
+    builder.toString
+  }
+
+  /**
+   * Decodes sanitized user principal
+   */
+  def desanitize(sanitizedUser: String): String = {
+    URLDecoder.decode(sanitizedUser, StandardCharsets.UTF_8.name)
+  }
+}
+
+case class QuotaId(sanitizedUser: Option[String], clientId: Option[String])
+
+case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, quota: Quota)
+
 /**
  * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
  * for all clients.
+ * <p/>
+ * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection,
+ * the most specific quota matching the connection will be applied. For example, if both a <user, client-id>
+ * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes
+ * precedence over client-id quota. The order of precedence is:
+ * <ul>
+ *   <li>/config/users/<user>/clients/<client-id>
+ *   <li>/config/users/<user>/clients/<default>
+ *   <li>/config/users/<user>
+ *   <li>/config/users/<default>/clients/<client-id>
+ *   <li>/config/users/<default>/clients/<default>
+ *   <li>/config/users/<default>
+ *   <li>/config/clients/<client-id>
+ *   <li>/config/clients/<default>
+ * </ul>
+ * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case
+ * where a single level of quotas is configured.
+ *
  * @param config @ClientQuotaManagerConfig quota configs
  * @param metrics @Metrics Metrics instance
  * @param apiKey API Key for the request
@@ -67,8 +133,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val apiKey: QuotaType,
                          private val time: Time) extends Logging {
-  private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
-  private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
+  private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
+  private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
+  private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess
@@ -107,8 +174,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = {
-    val clientSensors = getOrCreateQuotaSensors(clientId)
+  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = {
+    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
+    val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
     var throttleTimeMs = 0
     try {
       clientSensors.quotaSensor.record(value)
@@ -117,8 +185,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     } catch {
       case qve: QuotaViolationException =>
         // Compute the delay
-        val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
-        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
+        val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
+        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -128,6 +196,127 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     throttleTimeMs
   }
 
+  /**
+   * Determines the quota-id for the client with the specified user principal
+   * and client-id and returns the quota entity that encapsulates the quota-id
+   * and the associated quota override or default quota.
+   *
+   */
+  private def quotaEntity(sanitizedUser: String, clientId: String) : QuotaEntity = {
+    quotaTypesEnabled match {
+      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
+        val quotaId = QuotaId(None, Some(clientId))
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
+          if (quota == null)
+            quota = staticConfigClientIdQuota
+        }
+        QuotaEntity(quotaId, "", clientId, quota)
+      case QuotaTypes.UserQuotaEnabled =>
+        val quotaId = QuotaId(Some(sanitizedUser), None)
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
+          if (quota == null)
+            quota = ClientQuotaManagerConfig.UnlimitedQuota
+        }
+        QuotaEntity(quotaId, sanitizedUser, "", quota)
+      case QuotaTypes.UserClientIdQuotaEnabled =>
+        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default)))
+          if (quota == null) {
+            quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId)))
+            if (quota == null) {
+              quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
+              if (quota == null)
+                quota = ClientQuotaManagerConfig.UnlimitedQuota
+            }
+          }
+        }
+        QuotaEntity(quotaId, sanitizedUser, clientId, quota)
+      case _ =>
+        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId)
+    }
+  }
+
+  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String) : QuotaEntity = {
+    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+
+    val userQuotaId = QuotaId(Some(sanitizedUser), None)
+    val clientQuotaId = QuotaId(None, Some(clientId))
+    var quotaId = userClientQuotaId
+    var quotaConfigId = userClientQuotaId
+    // 1) /config/users/<user>/clients/<client-id>
+    var quota = overriddenQuota.get(quotaConfigId)
+    if (quota == null) {
+      // 2) /config/users/<user>/clients/<default>
+      quotaId = userClientQuotaId
+      quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default))
+      quota = overriddenQuota.get(quotaConfigId)
+
+      if (quota == null) {
+        // 3) /config/users/<user>
+        quotaId = userQuotaId
+        quotaConfigId = quotaId
+        quota = overriddenQuota.get(quotaConfigId)
+
+        if (quota == null) {
+          // 4) /config/users/<default>/clients/<client-id>
+          quotaId = userClientQuotaId
+          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId))
+          quota = overriddenQuota.get(quotaConfigId)
+
+          if (quota == null) {
+            // 5) /config/users/<default>/clients/<default>
+            quotaId = userClientQuotaId
+            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+            quota = overriddenQuota.get(quotaConfigId)
+
+            if (quota == null) {
+              // 6) /config/users/<default>
+              quotaId = userQuotaId
+              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None)
+              quota = overriddenQuota.get(quotaConfigId)
+
+              if (quota == null) {
+                // 7) /config/clients/<client-id>
+                quotaId = clientQuotaId
+                quotaConfigId = QuotaId(None, Some(clientId))
+                quota = overriddenQuota.get(quotaConfigId)
+
+                if (quota == null) {
+                  // 8) /config/clients/<default>
+                  quotaId = clientQuotaId
+                  quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default))
+                  quota = overriddenQuota.get(quotaConfigId)
+
+                  if (quota == null) {
+                    quotaId = clientQuotaId
+                    quotaConfigId = null
+                    quota = staticConfigClientIdQuota
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
+    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
+    QuotaEntity(quotaId, quotaUser, quotaClientId, quota)
+  }
+
+  /**
+   * Returns the quota for the client with the specified (non-encoded) user principal and client-id.
+   */
+  def quota(user: String, clientId: String) = {
+    quotaEntity(QuotaId.sanitize(user), clientId).quota
+  }
+
   /*
    * This calculates the amount of time needed to bring the metric within quota
    * assuming that no new metrics are recorded.
@@ -153,40 +342,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
-  /**
-   * Returns the quota for the specified clientId
-   */
-  def quota(clientId: String): Quota =
-    if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota
-
   /*
    * This function either returns the sensors for a given client id or creates them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
-  private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
+  private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {
+    // Names of the sensors to access
     ClientSensors(
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(clientId),
+        getQuotaSensorName(quotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock, metrics,
-        () => clientRateMetricName(clientId),
-        () => getQuotaMetricConfig(quota(clientId)),
+        () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId),
+        () => getQuotaMetricConfig(quotaEntity.quota),
         () => new Rate()
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientId),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock,
         metrics,
-        () => metrics.metricName("throttle-time", apiKey.toString, "Tracking average throttle-time per client", "client-id", clientId),
+        () => throttleMetricName(quotaEntity),
         () => null,
         () => new Avg()
       )
     )
   }
 
-  private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
-  private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId
+  private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
   private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
@@ -196,19 +380,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
-   * Reset quotas to the default value for the given clientId
-   * @param clientId client to override
-   */
-  def resetQuota(clientId: String) = {
-    updateQuota(clientId, defaultQuota)
-  }
-
-  /**
-   * Overrides quotas per clientId
-   * @param clientId client to override
-   * @param quota custom quota to apply
+   * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
+   * for any of these levels.
+   * @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
+   * @param clientId client to override if quota applies to <client-id> or <user, client-id>
+   * @param quota custom quota to apply or None if quota override is being removed
    */
-  def updateQuota(clientId: String, quota: Quota) = {
+  def updateQuota(sanitizedUser: Option[String], clientId: Option[String], quota: Option[Quota]) {
     /*
      * Acquire the write lock to apply changes in the quota objects.
      * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
@@ -218,31 +396,85 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")
-
-      if (quota.equals(defaultQuota))
-        this.overriddenQuota.remove(clientId)
-      else
-        this.overriddenQuota.put(clientId, quota)
-
-      // Change the underlying metric config if the sensor has been created.
-      // Note the metric could be expired by another thread, so use a local variable and null check.
-      val metric = metrics.metrics.get(clientRateMetricName(clientId))
-      if (metric != null) {
-        logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
-        metric.config(getQuotaMetricConfig(quota))
+      val quotaId = QuotaId(sanitizedUser, clientId)
+      val userInfo = sanitizedUser match {
+        case Some(ConfigEntityName.Default) => "default user "
+        case Some(user) => "user " + user + " "
+        case None => ""
+      }
+      val clientIdInfo = clientId match {
+        case Some(ConfigEntityName.Default) => "default client-id"
+        case Some(id) => "client-id " + id
+        case None => ""
+      }
+      quota match {
+        case Some(newQuota) =>
+          logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}")
+          overriddenQuota.put(quotaId, newQuota)
+          (sanitizedUser, clientId) match {
+            case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+            case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+            case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+            case (None, None) =>
+          }
+        case None =>
+          logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}")
+          overriddenQuota.remove(quotaId)
       }
+
+      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse(""))
+      val allMetrics = metrics.metrics()
+
+      // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics
+      // to find all affected values. Otherwise, update just the single matching one.
+      val singleUpdate = quotaTypesEnabled match {
+        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
+          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined
+        case _ => false
+      }
+      if (singleUpdate) {
+          // Change the underlying metric config if the sensor has been created
+          val metric = allMetrics.get(quotaMetricName)
+          if (metric != null) {
+            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""))
+            val newQuota = metricConfigEntity.quota
+            logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
+            metric.config(getQuotaMetricConfig(newQuota))
+          }
+      } else {
+          allMetrics.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
+            case (metricName, metric) =>
+              val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
+              val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""
+              val metricConfigEntity = quotaEntity(userTag, clientIdTag)
+              if (metricConfigEntity.quota != metric.config.quota) {
+                val newQuota = metricConfigEntity.quota
+                logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
+                metric.config(getQuotaMetricConfig(newQuota))
+              }
+          }
+      }
+
     } finally {
       lock.writeLock().unlock()
     }
   }
 
-  private def clientRateMetricName(clientId: String): MetricName = {
+  private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
     metrics.metricName("byte-rate", apiKey.toString,
-                   "Tracking byte-rate per client",
+                   "Tracking byte-rate per user/client-id",
+                   "user", sanitizedUser,
                    "client-id", clientId)
   }
 
+  private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
+    metrics.metricName("throttle-time",
+                       apiKey.toString,
+                       "Tracking average throttle-time per user/client-id",
+                       "user", quotaEntity.sanitizedUser,
+                       "client-id", quotaEntity.clientId)
+  }
+
   def shutdown() = {
     throttledRequestReaper.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 67b74a7..5be9c12 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -96,31 +96,59 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
   }
 }
 
-object ClientConfigOverride {
+object QuotaConfigOverride {
   val ProducerOverride = "producer_byte_rate"
   val ConsumerOverride = "consumer_byte_rate"
 }
 
 /**
-  * The ClientIdConfigHandler will process clientId config changes in ZK.
-  * The callback provides the clientId and the full properties set read from ZK.
-  * This implementation reports the overrides to the respective ClientQuotaManager objects
-  */
-class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends ConfigHandler {
+ * Handles <client-id>, <user> or <user, client-id> quota config updates in ZK.
+ * This implementation reports the overrides to the respective ClientQuotaManager objects
+ */
+class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
+
+  def updateQuotaConfig(sanitizedUser: Option[String], clientId: Option[String], config: Properties) {
+    val producerQuota =
+      if (config.containsKey(QuotaConfigOverride.ProducerOverride))
+        Some(new Quota(config.getProperty(QuotaConfigOverride.ProducerOverride).toLong, true))
+      else
+        None
+    quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota)
+    val consumerQuota =
+      if (config.containsKey(QuotaConfigOverride.ConsumerOverride))
+        Some(new Quota(config.getProperty(QuotaConfigOverride.ConsumerOverride).toLong, true))
+      else
+        None
+    quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+  }
+}
+
+/**
+ * The ClientIdConfigHandler will process clientId config changes in ZK.
+ * The callback provides the clientId and the full properties set read from ZK.
+ */
+class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler {
+
   def processConfigChanges(clientId: String, clientConfig: Properties) {
-    if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
-      quotaManagers.produce.updateQuota(clientId,
-        new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
-    } else {
-      quotaManagers.fetch.resetQuota(clientId)
-    }
+    updateQuotaConfig(None, Some(clientId), clientConfig)
+  }
+}
 
-    if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
-      quotaManagers.fetch.updateQuota(clientId,
-        new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
-    } else {
-      quotaManagers.produce.resetQuota(clientId)
-    }
+/**
+ * The UserConfigHandler will process <user> and <user, client-id> quota changes in ZK.
+ * The callback provides the node name containing sanitized user principal, client-id if this is
+ * a <user, client-id> update and the full properties set read from ZK.
+ */
+class UserConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler {
+
+  def processConfigChanges(quotaEntityPath: String, config: Properties) {
+    // Entity path is <user> or <user>/clients/<client>
+    val entities = quotaEntityPath.split("/")
+    if (entities.length != 1 && entities.length != 3)
+      throw new IllegalArgumentException("Invalid quota entity path: " + quotaEntityPath);
+    val sanitizedUser = entities(0)
+    val clientId = if (entities.length == 3) Some(entities(2)) else None
+    updateQuotaConfig(Some(sanitizedUser), clientId, config)
   }
 }
 
@@ -151,4 +179,4 @@ object ThrottledReplicaValidator extends Validator {
   private def isValid(proposed: String): Boolean = {
     proposed.trim.equals("*") || proposed.trim.matches("([0-9]+:[0-9]+)?(,[0-9]+:[0-9]+)*")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 556534a..b31d838 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -37,8 +37,13 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 object ConfigType {
   val Topic = "topics"
   val Client = "clients"
+  val User = "users"
   val Broker = "brokers"
-  val all = Seq(Topic, Client, Broker)
+  val all = Seq(Topic, Client, User, Broker)
+}
+
+object ConfigEntityName {
+  val Default = "<default>"
 }
 
 /**
@@ -48,7 +53,9 @@ object ConfigType {
  *
  * Config is stored under the path: /config/entityType/entityName
  *   E.g. /config/topics/<topic_name> and /config/clients/<clientId>
- * This znode stores the overrides for this entity (but no defaults) in properties format.
+ * This znode stores the overrides for this entity in properties format with defaults stored using entityName "<default>".
+ * Multiple entity names may be specified (eg. <user, client-id> quotas) using a hierarchical path:
+ *   E.g. /config/users/<user>/clients/<clientId>
  *
  * To avoid watching all topics for changes instead we have a notification path
  *   /config/changes
@@ -57,8 +64,10 @@ object ConfigType {
  * To update a config we first update the config properties. Then we create a new sequential
  * znode under the change path which contains the name of the entityType and entityName that was updated, say
  *   /config/changes/config_change_13321
- * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"}
+ * The sequential znode contains data in this format: {"version" : 1, "entity_type":"topic/client", "entity_name" : "topic_name/client_id"}
  * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path.
+ * Version 2 of notifications has the format: {"version" : 2, "entity_path":"entity_type/entity_name"}
+ * Multiple entities may be specified as a hierarchical path (eg. users/<user>/clients/<clientId>).
  *
  * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
  * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
@@ -89,30 +98,60 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
         case Some(mapAnon: Map[_, _]) =>
           val map = mapAnon collect
             { case (k: String, v: Any) => k -> v }
-          require(map("version") == 1)
-
-          val entityType = map.get("entity_type") match {
-            case Some(ConfigType.Topic) => ConfigType.Topic
-            case Some(ConfigType.Client) => ConfigType.Client
-            case Some(ConfigType.Broker) => ConfigType.Broker
-            case _ => throw new IllegalArgumentException(s"Config change notification must have 'entity_type' set to one of ${ConfigType.all}. Received: $json")
-          }
 
-          val entity = map.get("entity_name") match {
-            case Some(value: String) => value
-            case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
+          map("version") match {
+            case 1 => processEntityConfigChangeVersion1(json, map)
+            case 2 => processEntityConfigChangeVersion2(json, map)
+            case _ => throw new IllegalArgumentException("Config change notification has an unsupported version " + map("version") +
+                "Supported versions are 1 and 2.")
           }
-          val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
-          logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
-          configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
         case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-          "{\"version\" : 1," +
-          " \"entity_type\":\"topic/client\"," +
-          " \"entity_name\" : \"topic_name/client_id\"}." +
+          "{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " +
+          "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
           " Received: " + json)
       }
     }
+
+    private def processEntityConfigChangeVersion1(json: String, map: Map[String, Any]) {
+
+      val entityType = map.get("entity_type") match {
+        case Some(ConfigType.Topic) => ConfigType.Topic
+        case Some(ConfigType.Client) => ConfigType.Client
+        case _ => throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to 'clients' or 'topics'." +
+              " Received: " + json)
+      }
+
+      val entity = map.get("entity_name") match {
+        case Some(value: String) => value
+        case _ => throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
+      }
+
+      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
+      logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
+      configHandlers(entityType).processConfigChanges(entity, entityConfig)
+
+    }
+
+    private def processEntityConfigChangeVersion2(json: String, map: Map[String, Any]) {
+
+      val entityPath = map.get("entity_path") match {
+        case Some(value: String) => value
+        case _ => throw new IllegalArgumentException("Version 2 config change notification does not specify 'entity_path'. Received: " + json)
+      }
+
+      val index = entityPath.indexOf('/')
+      val rootEntityType = entityPath.substring(0, index)
+      if (index < 0 || !configHandlers.contains(rootEntityType))
+        throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with 'clients/', 'topics/' or 'users/'." +
+              " Received: " + json)
+      val fullSanitizedEntityName = entityPath.substring(index + 1)
+
+      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName)
+      logger.info(s"Processing override for entityPath: $entityPath with config: $entityConfig")
+      configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig)
+
+    }
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
@@ -122,5 +161,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
    */
   def startup(): Unit = {
     configChangeListener.init()
+
+    // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
+    configHandlers.foreach {
+      case (ConfigType.User, handler) =>
+          AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
+            case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
+          }
+          AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach {
+            case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
+          }
+      case (configType, handler) =>
+          AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
+            case (entityName, properties) => handler.processConfigChanges(entityName, properties)
+          }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3008426..d3ba5ef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -396,6 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
       quotas.produce.recordAndMaybeThrottle(
+        request.session.sanitizedUser,
         request.header.clientId,
         numBytesAppended,
         produceResponseCallback)
@@ -494,7 +495,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchResponseCallback(0)
       } else {
         val size = FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId)
-        quotas.fetch.recordAndMaybeThrottle(fetchRequest.clientId, size, fetchResponseCallback)
+        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, fetchRequest.clientId, size, fetchResponseCallback)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3671297..b37be5b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -523,8 +523,10 @@ object KafkaConfig {
   "or this timeout is reached. This is similar to the producer request timeout."
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
   /** ********* Quota Configuration ***********/
-  val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
-  val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
+  val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " +
+  "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
+  val ConsumerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user, <client-id> or <user, client-id> in Zookeeper. " +
+  "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
   val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index db92cb8..5055c87 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -251,14 +251,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
                                                            ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
+                                                           ConfigType.User -> new UserConfigHandler(quotaManagers),
                                                            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
-        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
-        // TODO: Move this logic to DynamicConfigManager
-        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
-          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
-        }
-
         // Create the config manager. start listening to notifications
         dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
         dynamicConfigManager.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 503ed54..96779ff 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -119,6 +119,9 @@ object ZkUtils {
   def getEntityConfigPath(entityType: String, entity: String): String =
     getEntityConfigRootPath(entityType) + "/" + entity
 
+  def getEntityConfigPath(entityPath: String): String =
+    ZkUtils.EntityConfigPath + "/" + entityPath
+
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
new file mode 100644
index 0000000..c9b7787
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -0,0 +1,195 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.server.{QuotaConfigOverride, KafkaConfig, KafkaServer, QuotaId}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.Map
+import scala.collection.mutable
+
+abstract class BaseQuotaTest extends IntegrationTestHarness {
+
+  def userPrincipal : String
+  def producerQuotaId : QuotaId
+  def consumerQuotaId : QuotaId
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+  def removeQuotaOverrides()
+
+  override val serverCount = 2
+  val producerCount = 1
+  val consumerCount = 1
+
+  private val producerBufferSize = 300000
+  protected val producerClientId = "QuotasTestProducer-1"
+  protected val consumerClientId = "QuotasTestConsumer-1"
+
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
+  this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
+  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+  this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+
+  // Low enough quota that a producer sending a small payload in a tight loop should get throttled
+  val defaultProducerQuota = 8000
+  val defaultConsumerQuota = 2500
+
+  var leaderNode: KafkaServer = null
+  var followerNode: KafkaServer = null
+  private val topic1 = "topic-1"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    val numPartitions = 1
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, serverCount, servers)
+    leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
+    followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
+    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
+  }
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  @Test
+  def testThrottledProducerConsumer() {
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+
+    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)
+    assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
+
+    // Consumer should read in a bursty manner and get throttled immediately
+    consume(consumers.head, numRecords)
+    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
+    assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
+  }
+
+  @Test
+  def testProducerConsumerOverrideUnthrottled() {
+    // Give effectively unlimited quota for producer and consumer
+    val props = new Properties()
+    props.put(QuotaConfigOverride.ProducerOverride, Long.MaxValue.toString)
+    props.put(QuotaConfigOverride.ConsumerOverride, Long.MaxValue.toString)
+
+    overrideQuotas(Long.MaxValue, Long.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)
+    assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
+
+    // The "client" consumer does not get throttled.
+    consume(consumers.head, numRecords)
+    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
+    assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
+  }
+
+  @Test
+  def testQuotaOverrideDelete() {
+    // Override producer and consumer quotas to unlimited
+    overrideQuotas(Long.MaxValue, Long.MaxValue)
+
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+    assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() == 0)
+    consume(consumers.head, numRecords)
+    assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() == 0)
+
+    // Delete producer and consumer quota overrides. Consumer and producer should now be
+    // throttled since broker defaults are very small
+    removeQuotaOverrides()
+    produce(producers.head, numRecords)
+
+    assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() > 0)
+    consume(consumers.head, numRecords)
+    assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() > 0)
+  }
+
+  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
+    var numBytesProduced = 0
+    for (i <- 0 to count) {
+      val payload = i.toString.getBytes
+      numBytesProduced += payload.length
+      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
+             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      Thread.sleep(1)
+    }
+    numBytesProduced
+  }
+
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
+    consumer.subscribe(List(topic1))
+    var numConsumed = 0
+    while (numConsumed < numRecords) {
+      for (cr <- consumer.poll(100)) {
+        numConsumed += 1
+      }
+    }
+  }
+
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+    TestUtils.retry(10000) {
+      val quotaManagers = leaderNode.apis.quotas
+      val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId)
+      val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId)
+
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+    }
+  }
+
+  private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = {
+    leaderNode.metrics.metricName("throttle-time",
+                                  apiKey.name,
+                                  "Tracking throttle-time per user/client-id",
+                                  "user", quotaId.sanitizedUser.getOrElse(""),
+                                  "client-id", quotaId.clientId.getOrElse(""))
+  }
+
+  def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+    val props = new Properties()
+    props.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
+    props.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    props
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
new file mode 100644
index 0000000..7477f7f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.server.{KafkaConfig, QuotaConfigOverride, QuotaId}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Before
+
+class ClientIdQuotaTest extends BaseQuotaTest {
+
+  override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
+  override val producerQuotaId = QuotaId(None, Some(producerClientId))
+  override val consumerQuotaId = QuotaId(None, Some(consumerClientId))
+
+  @Before
+  override def setUp() {
+    this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, defaultProducerQuota.toString)
+    this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
+    super.setUp()
+  }
+
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+    val producerProps = new Properties()
+    producerProps.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
+    updateQuotaOverride(producerClientId, producerProps)
+
+    val consumerProps = new Properties()
+    consumerProps.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    updateQuotaOverride(consumerClientId, consumerProps)
+  }
+  override def removeQuotaOverrides() {
+    val emptyProps = new Properties
+    updateQuotaOverride(producerClientId, emptyProps)
+    updateQuotaOverride(consumerClientId, emptyProps)
+  }
+
+  private def updateQuotaOverride(clientId: String, properties: Properties) {
+    AdminUtils.changeClientIdConfig(zkUtils, clientId, properties)
+  }
+}


Mime
View raw message