kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [3/3] kafka git commit: KAFKA-1464; Add a throttling option to the Kafka replication
Date Fri, 16 Sep 2016 05:28:34 GMT
KAFKA-1464; Add a throttling option to the Kafka replication

This applies to Replication Quotas
based on KIP-73 [(link)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas) originally motivated by KAFKA-1464.

System Tests Run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/544/

**This first PR demonstrates the approach**.

**_Overview of Change_**
The guts of this change are relatively small. Throttling occurs on both leader and follower sides. A single class tracks the throttled throughput in and out of each broker (**_ReplicationQuotaManager_**).

On the follower side, the Follower Throttled Rate is calculated as fetch responses arrive. Then, before the next fetch request is sent, we check to see if the quota is violated, removing throttled partitions from the request if it is. This is all encapsulated in a few lines of code in the **_ReplicaFetcherThread_**. There is existing code to handle temporal back off, if the request ends up being empty.

On the leader side it's a little more complex. When a fetch request arrives in the leader, it is built, partition by partition, in **_ReplicaManager.readFromLocalLog_**. As we put each partition into the fetch response, we check if the total size fits in the current quota. If the quota is exceeded, the partition will not be added to the fetch response. Importantly, we don't increase the quota at this point, we just check to see if the bytes will fit.

Now, if there aren't enough bytes to send the response immediately, which is common if we're catching up and throttled, then the request will be put in purgatory. I've added some simple code to **_DelayedFetch_** to handle throttled partitions (throttled partitions are checked against the quota, rather than the messages available in the log).

When the delayed fetch completes, and exits purgatory, _**ReplicaManager.readFromLocalLog**_ will be called again. This is why _**ReplicaManager.readFromLocalLog**_ does not actually increase the quota, it just checks whether enough bytes are available for a partition.

Finally, when there are enough bytes to be sent, or the delayed fetch times out, the response will be sent. Before it is sent the throttled-outbound-rate is increased, based on the size of throttled partitions being sent. This is at the end of _**KafkaApis.handleFetchRequest**_, exactly where client quotas are recorded.

There is an acceptance test which asserts the whole throttling process stabilises on the desired value. This covers a number of use cases including many-to-many replication. See **_ReplicationQuotaTest_**.

Note:
It should be noted that this protocol can over-request. The request is built, based on the quota at time t1 (_ReplicaManager.readFromLocalLog_). The bytes in the response are recorded at time t2 (end of _KafkaApis.handleFetchRequest_), where t2 > t1. For this reason I originally included an OverRequestedRate as a JMX metric, but testing has not seen revealed any obvious issue. Over-requesting is quickly compensated by subsequent requests, stabilising close to the quota value.

_**Main stuff left to do:**_
- The fetch size is currently unbounded. This will be addressed in KIP-74, but we need to ensure this ensures requests don’t go beyond the throttle window.
- There are two failures showing up in the system tests on this branch:  StreamsSmokeTest.test_streams (which looks like it fails regularly) and OffsetValidationTest.test_broker_rolling_bounce (which I need to look into)

_**Stuff left to do that could be deferred:**_
- Add the extra metrics specified in the KIP.
- There are no system tests.
- There is no validation for the cluster size / throttle combination that could lead to ISR dropouts

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #1776 from benstopford/rep-quotas-v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/143a33bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/143a33bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/143a33bc

Branch: refs/heads/trunk
Commit: 143a33bc506c703ea02fe5daa61f90b7094da13c
Parents: d436661
Author: Ben Stopford <benstopford@gmail.com>
Authored: Thu Sep 15 22:25:56 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Sep 15 22:25:56 2016 -0700

----------------------------------------------------------------------
 .../common/metrics/QuotaViolationException.java |  27 ++-
 .../org/apache/kafka/common/metrics/Sensor.java |  12 +-
 .../apache/kafka/common/metrics/stats/Rate.java |   4 +-
 .../kafka/common/metrics/stats/SimpleRate.java  |  39 +++
 .../kafka/common/metrics/MetricsTest.java       |  59 +++++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  26 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  67 ++---
 .../kafka/admin/ReassignPartitionsCommand.scala | 142 +++++++++--
 .../kafka/consumer/ConsumerFetcherThread.scala  |   4 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |  16 +-
 .../kafka/server/AbstractFetcherThread.scala    |   1 -
 .../scala/kafka/server/ClientQuotaManager.scala | 101 +++-----
 .../main/scala/kafka/server/ConfigHandler.scala |  98 ++++++--
 .../main/scala/kafka/server/DelayedFetch.scala  |  18 +-
 .../kafka/server/DynamicConfigManager.scala     |   6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  62 ++---
 .../main/scala/kafka/server/KafkaConfig.scala   |  25 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  14 +-
 .../main/scala/kafka/server/QuotaFactory.scala  |  74 ++++++
 .../kafka/server/ReplicaFetcherManager.scala    |  10 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  22 +-
 .../scala/kafka/server/ReplicaManager.scala     |  27 ++-
 .../kafka/server/ReplicationQuotaManager.scala  | 202 ++++++++++++++++
 .../main/scala/kafka/server/SensorAccess.scala  |  76 ++++++
 core/src/main/scala/kafka/utils/CoreUtils.scala |   1 +
 core/src/main/scala/kafka/utils/Time.scala      |   2 +-
 .../kafka/api/ClientQuotasTest.scala            | 206 ++++++++++++++++
 .../integration/kafka/api/QuotasTest.scala      | 207 ----------------
 .../test/scala/unit/kafka/admin/AdminTest.scala | 102 ++++++--
 .../unit/kafka/admin/ConfigCommandTest.scala    | 188 ++++++++++++--
 .../admin/ReassignPartitionsClusterTest.scala   | 152 +++++++++++-
 .../admin/ReassignPartitionsCommandTest.scala   |  88 +++++++
 .../kafka/admin/ReplicationQuotaUtils.scala     |  51 ++++
 .../scala/unit/kafka/admin/TestAdminUtils.scala |  27 +++
 .../unit/kafka/integration/FetcherTest.scala    |   1 -
 .../scala/unit/kafka/log/LogConfigTest.scala    |  31 ++-
 .../server/AbstractFetcherThreadTest.scala      |   1 +
 .../kafka/server/ClientQuotaManagerTest.scala   |  22 +-
 .../kafka/server/DynamicConfigChangeTest.scala  | 103 +++++---
 .../server/HighwatermarkPersistenceTest.scala   |   9 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   2 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala | 154 ++++++++++++
 .../unit/kafka/server/ReplicaManagerTest.scala  |  10 +-
 .../server/ReplicationQuotaManagerTest.scala    | 123 ++++++++++
 .../kafka/server/ReplicationQuotasTest.scala    | 242 +++++++++++++++++++
 .../unit/kafka/server/SimpleFetchTest.scala     |  24 +-
 .../ThrottledResponseExpirationTest.scala       |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  14 +-
 48 files changed, 2324 insertions(+), 570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
index fbe03f5..7e1a2c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 
 /**
  * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
@@ -24,8 +25,30 @@ import org.apache.kafka.common.KafkaException;
 public class QuotaViolationException extends KafkaException {
 
     private static final long serialVersionUID = 1L;
+    private final MetricName metricName;
+    private final double value;
+    private final double bound;
 
-    public QuotaViolationException(String m) {
-        super(m);
+    public QuotaViolationException(MetricName metricName, double value, double bound) {
+        super(String.format(
+                "'%s' violated quota. Actual: %f, Threshold: %f",
+                metricName,
+                value,
+                bound));
+        this.metricName = metricName;
+        this.value = value;
+        this.bound = bound;
+    }
+
+    public MetricName metricName() {
+        return metricName;
+    }
+
+    public double value() {
+        return value;
+    }
+
+    public double bound() {
+        return bound;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 3e500d5..4f630f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -109,9 +109,12 @@ public final class Sensor {
 
     /**
      * Check if we have violated our quota for any metric that has a configured quota
-     * @param timeMs
      */
-    private void checkQuotas(long timeMs) {
+    public void checkQuotas() {
+        checkQuotas(time.milliseconds());
+    }
+
+    public void checkQuotas(long timeMs) {
         for (int i = 0; i < this.metrics.size(); i++) {
             KafkaMetric metric = this.metrics.get(i);
             MetricConfig config = metric.config();
@@ -120,11 +123,10 @@ public final class Sensor {
                 if (quota != null) {
                     double value = metric.value(timeMs);
                     if (!quota.acceptable(value)) {
-                        throw new QuotaViolationException(String.format(
-                            "'%s' violated quota. Actual: %f, Threshold: %f",
+                        throw new QuotaViolationException(
                             metric.metricName(),
                             value,
-                            quota.bound()));
+                            quota.bound());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 971b7b6..6e49649 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
  */
 public class Rate implements MeasurableStat {
 
-    private final TimeUnit unit;
-    private final SampledStat stat;
+    protected final TimeUnit unit;
+    protected final SampledStat stat;
 
     public Rate() {
         this(TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
new file mode 100644
index 0000000..018f084
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * A simple rate the rate is incrementally calculated
+ * based on the elapsed time between the earliest reading
+ * and now.
+ *
+ * An exception is made for the first window, which is
+ * considered of fixed size. This avoids the issue of
+ * an artificially high rate when the gap between readings
+ * is close to 0.
+ */
+public class SimpleRate extends Rate {
+
+    @Override
+    public long windowSize(MetricConfig config, long now) {
+        stat.purgeObsoleteSamples(config, now);
+        long elapsed = now - stat.oldest(now).lastWindowMs;
+        return elapsed < config.timeWindowMs() ? config.timeWindowMs() : elapsed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 52f0cd7..5797b36 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
@@ -459,4 +460,62 @@ public class MetricsTest {
 
     }
 
+    @Test
+    public void testSimpleRate() {
+        SimpleRate rate = new SimpleRate();
+
+        //Given
+        MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.SECONDS).samples(10);
+
+        //In the first window the rate is a fraction of the whole (1s) window
+        //So when we record 1000 at t0, the rate should be 1000 until the window completes, or more data is recorded.
+        record(rate, config, 1000);
+        assertEquals(1000, measure(rate, config), 0);
+        time.sleep(100);
+        assertEquals(1000, measure(rate, config), 0); // 1000B / 0.1s
+        time.sleep(100);
+        assertEquals(1000, measure(rate, config), 0); // 1000B / 0.2s
+        time.sleep(200);
+        assertEquals(1000, measure(rate, config), 0); // 1000B / 0.4s
+
+        //In the second (and subsequent) window(s), the rate will be in proportion to the elapsed time
+        //So the rate will degrade over time, as the time between measurement and the initial recording grows.
+        time.sleep(600);
+        assertEquals(1000, measure(rate, config), 0); // 1000B / 1.0s
+        time.sleep(200);
+        assertEquals(1000 / 1.2, measure(rate, config), 0); // 1000B / 1.2s
+        time.sleep(200);
+        assertEquals(1000 / 1.4, measure(rate, config), 0); // 1000B / 1.4s
+
+        //Adding another value, inside the same window should double the rate
+        record(rate, config, 1000);
+        assertEquals(2000 / 1.4, measure(rate, config), 0); // 2000B / 1.4s
+
+        //Going over the next window, should not change behaviour
+        time.sleep(1100);
+        assertEquals(2000 / 2.5, measure(rate, config), 0); // 2000B / 2.5s
+        record(rate, config, 1000);
+        assertEquals(3000 / 2.5, measure(rate, config), 0); // 3000B / 2.5s
+
+        //Sleeping for another 6.5 windows also should be the same
+        time.sleep(6500);
+        assertEquals(3000 / 9, measure(rate, config), 1); // 3000B / 9s
+        record(rate, config, 1000);
+        assertEquals(4000 / 9, measure(rate, config), 1); // 4000B / 9s
+
+        //Going over the 10 window boundary should cause the first window's values (1000) will be purged.
+        //So the rate is calculated based on the oldest reading, which is inside the second window, at 1.4s
+        time.sleep(1500);
+        assertEquals((4000 - 1000) / (10.5 - 1.4), measure(rate, config), 1);
+        record(rate, config, 1000);
+        assertEquals((5000 - 1000) / (10.5 - 1.4), measure(rate, config), 1);
+    }
+
+    private void record(Rate rate, MetricConfig config, int value) {
+        rate.record(config, value, time.milliseconds());
+    }
+
+    private Double measure(Measurable rate, MetricConfig config) {
+        return rate.measure(config, time.milliseconds());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d8702df..400cc47 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
-import kafka.server.ConfigType
+import kafka.server.{KafkaConfig, ConfigType}
 import kafka.utils._
 import kafka.utils.ZkUtils._
 import java.util.Random
@@ -38,7 +38,14 @@ import collection.Map
 import collection.Set
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
-object AdminUtils extends Logging {
+trait AdminUtilities {
+  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
+  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
+  def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
+  def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
+}
+
+object AdminUtils extends Logging with AdminUtilities {
   val rand = new Random
   val AdminClientId = "__admin_client"
   val EntityConfigChangeZnodePrefix = "config_change_"
@@ -498,6 +505,21 @@ object AdminUtils extends Logging {
     changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
   }
 
+  /**
+    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+    * override any defaults entered in the broker's config files
+    *
+    * @param zkUtils: Zookeeper utilities used to write the config to ZK
+    * @param brokers: The list of brokers to apply config changes to
+    * @param configs: The config to change, as properties
+    */
+  def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
+    KafkaConfig.validateNames(configs)
+    brokers.foreach { broker =>
+      changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs)
+    }
+  }
+
   private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
     // write the new config--may not exist if there were previously no overrides
     writeEntityConfig(zkUtils, entityType, entityName, configs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index eaddd84..ebf9e61 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import joptsimple._
 import kafka.admin.TopicCommand._
 import kafka.log.{Defaults, LogConfig}
-import kafka.server.{ClientConfigOverride, ConfigType}
+import kafka.server.{KafkaConfig, ClientConfigOverride, ConfigType}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
@@ -32,7 +32,7 @@ import scala.collection._
 
 
 /**
- * This script can be used to change configs for topics/clients dynamically
+ * This script can be used to change configs for topics/clients/brokers dynamically
  */
 object ConfigCommand {
 
@@ -41,7 +41,7 @@ object ConfigCommand {
     val opts = new ConfigCommandOptions(args)
 
     if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topics/clients) configs")
+      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client or broker")
 
     opts.checkArgs()
 
@@ -64,7 +64,7 @@ object ConfigCommand {
     }
   }
 
-  private def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
+  def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entityType = opts.options.valueOf(opts.entityType)
@@ -72,17 +72,17 @@ object ConfigCommand {
     warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
-    val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
+    val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
     configs.putAll(configsToBeAdded)
     configsToBeDeleted.foreach(config => configs.remove(config))
 
-    if (entityType.equals(ConfigType.Topic)) {
-      AdminUtils.changeTopicConfig(zkUtils, entityName, configs)
-      println("Updated config for topic: \"%s\".".format(entityName))
-    } else {
-      AdminUtils.changeClientIdConfig(zkUtils, entityName, configs)
-      println("Updated config for clientId: \"%s\".".format(entityName))
+    entityType match {
+      case ConfigType.Topic =>  utils.changeTopicConfig(zkUtils, entityName, configs)
+      case ConfigType.Client =>  utils.changeClientIdConfig(zkUtils, entityName, configs)
+      case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
+      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
     }
+    println(s"Updated config for EntityType:$entityType => EntityName:'$entityName'.")
   }
 
   def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
@@ -97,6 +97,15 @@ object ConfigCommand {
     }
   }
 
+  private def parseBroker(broker: String): Int = {
+    try {
+      broker.toInt
+    }catch {
+      case e: NumberFormatException =>
+        throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+    }
+  }
+
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val entityType = opts.options.valueOf(opts.entityType)
     val entityNames: Seq[String] =
@@ -113,14 +122,19 @@ object ConfigCommand {
   }
 
   private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
-    val configsToBeAdded = opts.options.valuesOf(opts.addConfig).map(_.split("""\s*=\s*"""))
-    require(configsToBeAdded.forall(config => config.length == 2),
-      "Invalid entity config: all configs to be added must be in the format \"key=val\".")
     val props = new Properties
-    configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
-    if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
-      println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
-        s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
+    if (opts.options.has(opts.addConfig)) {
+      //split by commas, but avoid those in [], then into KV pairs
+      val configsToBeAdded = opts.options.valueOf(opts.addConfig)
+        .split(",(?=[^\\]]*(?:\\[|$))")
+        .map(_.split("""\s*=\s*"""))
+      require(configsToBeAdded.forall(config => config.length == 2), "Invalid entity config: all configs to be added must be in the format \"key=val\".")
+      //Create properties, parsing square brackets from values if necessary
+      configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
+      if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
+        println(s"WARNING: The configuration ${LogConfig.MessageFormatVersionProp}=${props.getProperty(LogConfig.MessageFormatVersionProp)} is specified. " +
+          s"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker.")
+      }
     }
     props
   }
@@ -145,21 +159,21 @@ object ConfigCommand {
             .ofType(classOf[String])
     val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
     val describeOpt = parser.accepts("describe", "List configs for the given entity.")
-    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients)")
+    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/brokers)")
             .withRequiredArg
             .ofType(classOf[String])
-    val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)")
+    val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/broker id)")
             .withRequiredArg
             .ofType(classOf[String])
 
     val nl = System.getProperty("line.separator")
-    val addConfig = parser.accepts("add-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " +
+    val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
             "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
+            "For entity_type '" + ConfigType.Broker + "': " + nl + KafkaConfig.dynamicBrokerConfigs.map("\t" + _).mkString(nl) + nl +
             "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride
                                                             + nl + "\t" + ClientConfigOverride.ConsumerOverride)
             .withRequiredArg
             .ofType(classOf[String])
-            .withValuesSeparatedBy(',')
     val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'")
             .withRequiredArg
             .ofType(classOf[String])
@@ -181,19 +195,14 @@ object ConfigCommand {
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
       if(options.has(alterOpt)) {
-        if(! options.has(entityName))
-          throw new IllegalArgumentException("--entity-name must be specified with --alter")
+        require(options.has(entityName), "--entity-name must be specified with --alter")
 
         val isAddConfigPresent: Boolean = options.has(addConfig)
         val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
         if(! isAddConfigPresent && ! isDeleteConfigPresent)
           throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
       }
-      val entityTypeVal = options.valueOf(entityType)
-      if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) {
-        throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client))
-      }
+      require(ConfigType.all.contains(options.valueOf(entityType)), s"--entity-type must be one of ${ConfigType.all}")
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 0b32d93..ad050b4 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -16,10 +16,13 @@
  */
 package kafka.admin
 
+import java.text.NumberFormat._
+import java.util.Properties
 import joptsimple.OptionParser
+import kafka.log.LogConfig
+import kafka.server.{ConfigType, KafkaConfig}
 import kafka.utils._
-import collection._
-import org.I0Itec.zkclient.ZkClient
+import scala.collection._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 import org.apache.kafka.common.utils.Utils
@@ -27,6 +30,8 @@ import org.apache.kafka.common.security.JaasUtils
 
 object ReassignPartitionsCommand extends Logging {
 
+  //TODO Note to reviewer - this class needs a little more work (which I'll complete on Monday, or we could just revert this, but including here as an outline of what is intended)
+
   def main(args: Array[String]): Unit = {
 
     val opts = new ReassignPartitionsCommandOptions(args)
@@ -62,9 +67,12 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+    verifyAssignment(zkUtils, jsonString)
+  }
 
+  def verifyAssignment(zkUtils: ZkUtils, jsonString: String): Unit = {
     println("Status of partition reassignment:")
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
     val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
     reassignedPartitionsStatus.foreach { case (topicPartition, status) =>
       status match {
@@ -76,6 +84,28 @@ object ReassignPartitionsCommand extends Logging {
           println("Reassignment of partition %s is still in progress".format(topicPartition))
       }
     }
+    removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus)
+  }
+
+  def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, scala.Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = {
+    //If all partitions have completed remove the throttle
+    if (reassignedPartitionsStatus.forall { case (topicPartition, status) => status == ReassignmentCompleted }) {
+      //Remove the throttle limit from all brokers in the cluster
+      for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) {
+        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString)
+        if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null)
+          AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs)
+      }
+
+      //Remove the list of throttled replicas from all topics with partitions being moved
+      val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct
+      for (topic <- topics) {
+        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+        if (configs.remove(LogConfig.ThrottledReplicasListProp) != null)
+          AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+      }
+      println("Throttle was removed.")
+    }
   }
 
   def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
@@ -112,7 +142,6 @@ object ReassignPartitionsCommand extends Logging {
         TopicAndPartition(topic, partition) -> replicas
       }
     }
-
     (partitionsToBeReassigned, currentAssignment)
   }
 
@@ -121,11 +150,38 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
-    executeAssignment(zkUtils, reassignmentJsonString)
+    val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1
+    executeAssignment(zkUtils, reassignmentJsonString, throttle)
   }
 
-  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String) {
+  def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
+    val partitionsToBeReassigned = parseAndValidate(reassignmentJsonString)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
+
+    // If there is an existing rebalance running, attempt to change its throttle
+    if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
+      println("There is an existing assignment running.")
+      reassignPartitionsCommand.maybeLimit(throttle)
+    }
+    else {
+      printCurrentAssignment(zkUtils, partitionsToBeReassigned)
+      if (throttle >= 0)
+        println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
+      if (reassignPartitionsCommand.reassignPartitions(throttle)) {
+        println("Successfully started reassignment of partitions.")
+      } else
+        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+    }
+  }
+
+  def printCurrentAssignment(zkUtils: ZkUtils, partitionsToBeReassigned: scala.Seq[(TopicAndPartition, scala.Seq[Int])]): Unit = {
+    // before starting assignment, output the current replica assignment to facilitate rollback
+    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
+    println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
+      .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
+  }
 
+  def parseAndValidate(reassignmentJsonString: String): scala.Seq[(TopicAndPartition, scala.Seq[Int])] = {
     val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
@@ -141,16 +197,7 @@ object ReassignPartitionsCommand extends Logging {
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
     }
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
-    // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
-    println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
-    // start the reassignment
-    if (reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(ZkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
-    else
-      println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+    partitionsToBeReassigned
   }
 
   private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
@@ -192,7 +239,7 @@ object ReassignPartitionsCommand extends Logging {
     val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." +
       " Note that this only generates a candidate assignment, it does not execute it.")
     val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.")
-    val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.")
+    val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed")
     val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" +
                       "The format to use is - \n" +
                       "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 1,\n\t  \"replicas\": [1,2,3] }],\n\"version\":1\n}")
@@ -211,7 +258,11 @@ object ReassignPartitionsCommand extends Logging {
                       .describedAs("brokerlist")
                       .ofType(classOf[String])
     val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
-
+    val throttleOpt = parser.accepts("throttle", "The movement of partitions will be throttled to this value (bytes/sec). Rerunning with this option, whilst a rebalance is in progress, will alter the throttle value.")
+                      .withRequiredArg()
+                      .describedAs("throttle")
+                      .defaultsTo("-1")
+                      .ofType(classOf[Long])
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")
 
@@ -219,9 +270,60 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
+class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: Map[TopicAndPartition, Seq[Int]])
   extends Logging {
-  def reassignPartitions(): Boolean = {
+
+  private def maybeThrottle(throttle: Long): Unit = {
+    if (throttle >= 0) {
+      maybeLimit(throttle)
+      addThrottledReplicaList()
+    }
+  }
+
+  def maybeLimit(throttle: Long) {
+    if (throttle >= 0) {
+      val existingBrokers = zkUtils.getReplicaAssignmentForTopics(partitions.map(_._1.topic).toSeq).flatMap(_._2).toSeq
+      val proposedBrokers = partitions.flatMap(_._2).toSeq
+      val brokers = (existingBrokers ++ proposedBrokers).distinct
+
+      for (id <- brokers) {
+        val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, id.toString)
+        configs.put(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString)
+        AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
+      }
+      println(f"The throttle limit was set to $throttle%,d B/s")
+    }
+  }
+
+  def addThrottledReplicaList(): Unit = {
+    //apply the throttle to all move destinations and all move sources
+    val existing = zkUtils.getReplicaAssignmentForTopics(partitions.map(_._1.topic).toSeq)
+    val moves = replicaMoves(existing, proposed = partitions)
+    for (topic <- partitions.keySet.map(tp => tp.topic).toSeq.distinct)
+      AdminUtils.changeTopicConfig(zkUtils, topic, new Properties {
+        put(LogConfig.ThrottledReplicasListProp, moves.get(topic).get )
+      })
+    println(s"Throttles were added to the following replicas: $moves")
+  }
+
+  def replicaMoves(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[String, String] = {
+    //Find the replicas that have moved
+    val movesByPartition = existing.map { case (topicAndPartition, existingReplicas) =>
+      val before = existingReplicas.toSet
+      val after = proposed.get(topicAndPartition).get.toSet
+      val moving = (after.filterNot(before) ++ before.filterNot(after)).toSeq.distinct.sorted
+      val formatted = moving.map { brokerId => s"${topicAndPartition.partition}:$brokerId" }
+      (topicAndPartition, formatted)
+    }
+    //Group by topic
+    val movesByTopic = movesByPartition.groupBy(_._1.topic)
+      .map { case (topic, reps) => (topic, reps.values.flatMap(rep => rep)) }
+
+    movesByTopic.map { case (topic, moves) => topic -> moves.mkString(",") }
+  }
+
+  def reassignPartitions(throttle: Long = -1): Boolean = {
+    maybeThrottle(throttle)
     try {
       val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
       if (validPartitions.isEmpty) false

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 8801ff8..c33122b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -22,8 +22,7 @@ import kafka.cluster.BrokerEndPoint
 import kafka.message.ByteBufferMessageSet
 import kafka.server.{PartitionFetchState, AbstractFetcherThread}
 import kafka.common.{ErrorMapping, TopicAndPartition}
-import scala.collection.JavaConverters
-import JavaConverters._
+import scala.collection.Map
 import ConsumerFetcherThread._
 
 class ConsumerFetcherThread(name: String,
@@ -108,7 +107,6 @@ class ConsumerFetcherThread(name: String,
     simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) =>
       key -> new PartitionData(value)
     }
-
 }
 
 object ConsumerFetcherThread {

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index b8efcc3..f01166d 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
-import kafka.server.KafkaConfig
+import kafka.server.{ThrottledReplicaValidator, KafkaConfig}
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.record.TimestampType
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
 import java.util.Locale
 
 import scala.collection.mutable
-import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList,Validator}
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -55,6 +55,7 @@ object Defaults {
   val MessageFormatVersion = kafka.server.Defaults.LogMessageFormatVersion
   val MessageTimestampType = kafka.server.Defaults.LogMessageTimestampType
   val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
+  val ThrottledReplicasList = ""
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
@@ -85,6 +86,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
   val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
   val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
+  val throttledReplicasList = getString(LogConfig.ThrottledReplicasListProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -121,6 +123,7 @@ object LogConfig {
   val MessageFormatVersionProp = "message.format.version"
   val MessageTimestampTypeProp = "message.timestamp.type"
   val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
+  val ThrottledReplicasListProp = "quota.replication.throttled.replicas"
 
   val SegmentSizeDoc = "This configuration controls the segment file size for " +
     "the log. Retention and cleaning is always done a file at a time so a larger " +
@@ -190,13 +193,15 @@ object LogConfig {
   val MessageTimestampTypeDoc = KafkaConfig.LogMessageTimestampTypeDoc
   val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
     "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " +
-    "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."  
+    "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."
+  val ThrottledReplicasListDoc = "A list of replicas for which log replication should be throttled. The list should describe a set of " +
+    "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:..."
 
   private class LogConfigDef extends ConfigDef {
 
     private final val serverDefaultConfigNames = mutable.Map[String, String]()
 
-    def define(name: String, defType: ConfigDef.Type, defaultValue: Any, validator: ConfigDef.Validator,
+    def define(name: String, defType: ConfigDef.Type, defaultValue: Any, validator: Validator,
                importance: ConfigDef.Importance, doc: String, serverDefaultConfigName: String): LogConfigDef = {
       super.define(name, defType, defaultValue, validator, importance, doc)
       serverDefaultConfigNames.put(name, serverDefaultConfigName)
@@ -280,6 +285,7 @@ object LogConfig {
         KafkaConfig.LogMessageTimestampTypeProp)
       .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs,
         atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc, KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+      .define(ThrottledReplicasListProp, STRING, Defaults.ThrottledReplicasList, ThrottledReplicaValidator, MEDIUM, ThrottledReplicasListDoc, ThrottledReplicasListProp)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())
@@ -303,7 +309,7 @@ object LogConfig {
     val names = configNames
     for(name <- props.asScala.keys)
       if (!names.contains(name))
-        throw new InvalidConfigurationException(s"Unknown configuration $name.")
+        throw new InvalidConfigurationException(s"Unknown Log configuration $name.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 4aba667..36baf1f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -93,7 +93,6 @@ abstract class AbstractFetcherThread(name: String,
       }
       fetchRequest
     }
-
     if (!fetchRequest.isEmpty)
       processFetchRequest(fetchRequest)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5e90080..e6cac5d 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -17,7 +17,6 @@
 package kafka.server
 
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
-
 import kafka.utils.{ShutdownableThread, Logging}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
@@ -66,18 +65,19 @@ object ClientQuotaManagerConfig {
  */
 class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
-                         private val apiKey: String,
+                         private val apiKey: QuotaType,
                          private val time: Time) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
   private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
+  private val sensorAccessor = new SensorAccess
   val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
   throttledRequestReaper.start()
 
   private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
-                                      apiKey,
+                                      apiKey.toString,
                                       "Tracks the size of the delay queue"), new Total())
 
   /**
@@ -164,71 +164,24 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
    */
   private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
-
-    // Names of the sensors to access
-    val quotaSensorName = getQuotaSensorName(clientId)
-    val throttleTimeSensorName = getThrottleTimeSensorName(clientId)
-    var quotaSensor: Sensor = null
-    var throttleTimeSensor: Sensor = null
-
-    /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads.
-     * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
-     * will acquire the write lock and prevent the sensors from being read while they are being created.
-     * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
-     * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
-     * This read lock waits until the writer thread has released its lock i.e. fully initialized the sensor
-     * at which point it is safe to read
-     */
-    lock.readLock().lock()
-    try {
-      quotaSensor = metrics.getSensor(quotaSensorName)
-      throttleTimeSensor = metrics.getSensor(throttleTimeSensorName)
-    }
-    finally {
-      lock.readLock().unlock()
-    }
-
-    /* If the sensor is null, try to create it else return the created sensor
-     * Either of the sensors can be null, hence null checks on both
-     */
-    if (quotaSensor == null || throttleTimeSensor == null) {
-      /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
-       * Note that multiple threads may acquire the write lock if they all see a null sensor initially
-       * In this case, the writer checks the sensor after acquiring the lock again.
-       * This is safe from Double Checked Locking because the references are read
-       * after acquiring read locks and hence they cannot see a partially published reference
-       */
-      lock.writeLock().lock()
-      try {
-        // Set the var for both sensors in case another thread has won the race to acquire the write lock. This will
-        // ensure that we initialise `ClientSensors` with non-null parameters.
-        quotaSensor = metrics.getSensor(quotaSensorName)
-        throttleTimeSensor = metrics.getSensor(throttleTimeSensorName)
-        if (throttleTimeSensor == null) {
-          // create the throttle time sensor also. Use default metric config
-          throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
-                                              null,
-                                              ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
-          throttleTimeSensor.add(metrics.metricName("throttle-time",
-                                                apiKey,
-                                                "Tracking average throttle-time per client",
-                                                "client-id",
-                                                clientId), new Avg())
-        }
-
-
-        if (quotaSensor == null) {
-          quotaSensor = metrics.sensor(quotaSensorName,
-                                       getQuotaMetricConfig(quota(clientId)),
-                                       ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
-          quotaSensor.add(clientRateMetricName(clientId), new Rate())
-        }
-      } finally {
-        lock.writeLock().unlock()
-      }
-    }
-    // return the read or created sensors
-    ClientSensors(quotaSensor, throttleTimeSensor)
+    ClientSensors(
+      sensorAccessor.getOrCreate(
+        getQuotaSensorName(clientId),
+        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+        lock, metrics,
+        () => clientRateMetricName(clientId),
+        () => getQuotaMetricConfig(quota(clientId)),
+        () => new Rate()
+      ),
+      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientId),
+        ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
+        lock,
+        metrics,
+        () => metrics.metricName("throttle-time", apiKey.toString, "Tracking average throttle-time per client", "client-id", clientId),
+        () => null,
+        () => new Avg()
+      )
+    )
   }
 
   private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId
@@ -272,12 +225,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       else
         this.overriddenQuota.put(clientId, quota)
 
-      // Change the underlying metric config if the sensor has been created
-      val allMetrics = metrics.metrics()
-      val quotaMetricName = clientRateMetricName(clientId)
-      if (allMetrics.containsKey(quotaMetricName)) {
+      // Change the underlying metric config if the sensor has been created.
+      // Note the metric could be expired by another thread, so use a local variable and null check.
+      val metric = metrics.metrics.get(clientRateMetricName(clientId))
+      if (metric != null) {
         logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
-        allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota))
+        metric.config(getQuotaMetricConfig(quota))
       }
     } finally {
       lock.writeLock().unlock()
@@ -285,7 +238,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   private def clientRateMetricName(clientId: String): MetricName = {
-    metrics.metricName("byte-rate", apiKey,
+    metrics.metricName("byte-rate", apiKey.toString,
                    "Tracking byte-rate per client",
                    "client-id", clientId)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index d07fdd8..67b74a7 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -21,25 +21,28 @@ import java.util.Properties
 
 import kafka.api.ApiVersion
 import kafka.log.{LogConfig, LogManager}
+import kafka.server.Constants._
+import kafka.server.KafkaConfig._
+import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
+import org.apache.kafka.common.config.ConfigDef.Validator
+import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Quota
-import org.apache.kafka.common.protocol.ApiKeys
-
-import scala.collection.Map
+import org.apache.kafka.common.metrics.Quota._
 import scala.collection.JavaConverters._
 
 /**
- * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
- */
+  * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
+  */
 trait ConfigHandler {
   def processConfigChanges(entityName: String, value: Properties)
 }
 
 /**
- * The TopicConfigHandler will process topic config changes in ZK.
- * The callback provides the topic name and the full properties set read from ZK
- */
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig) extends ConfigHandler with Logging {
+  * The TopicConfigHandler will process topic config changes in ZK.
+  * The callback provides the topic name and the full properties set read from ZK
+  */
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers) extends ConfigHandler with Logging  {
 
   def processConfigChanges(topic: String, topicConfig: Properties) {
     // Validate the compatibility of message format version.
@@ -63,6 +66,33 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
       val logConfig = LogConfig(props)
       logs.foreach(_.config = logConfig)
     }
+
+    val brokerId = kafkaConfig.brokerId
+
+    if (topicConfig.containsKey(LogConfig.ThrottledReplicasListProp) && topicConfig.getProperty(LogConfig.ThrottledReplicasListProp).length > 0) {
+      val partitions = parseThrottledPartitions(topicConfig, brokerId)
+      quotas.leader.markThrottled(topic, partitions)
+      quotas.follower.markThrottled(topic, partitions)
+      logger.debug(s"Setting throttled partitions on broker $brokerId for topic: $topic and partitions $partitions")
+    } else {
+      quotas.leader.removeThrottle(topic)
+      quotas.follower.removeThrottle(topic)
+      logger.debug(s"Removing throttled partitions from broker $brokerId for topic $topic")
+    }
+  }
+
+  def parseThrottledPartitions(topicConfig: Properties, brokerId: Int): Seq[Int] = {
+    val configValue = topicConfig.get(LogConfig.ThrottledReplicasListProp).toString.trim
+    ThrottledReplicaValidator.ensureValid(LogConfig.ThrottledReplicasListProp, configValue)
+    configValue.trim match {
+      case "" => Seq()
+      case "*" => AllReplicas
+      case _ => configValue.trim
+        .split(",")
+        .map(_.split(":"))
+        .filter(_ (1).toInt == brokerId) //Filter this replica
+        .map(_ (0).toInt).toSeq //convert to list of partition ids
+    }
   }
 }
 
@@ -72,25 +102,53 @@ object ClientConfigOverride {
 }
 
 /**
- * The ClientIdConfigHandler will process clientId config changes in ZK.
- * The callback provides the clientId and the full properties set read from ZK.
- * This implementation reports the overrides to the respective ClientQuotaManager objects
- */
-class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {
-
-  def processConfigChanges(clientId: String, clientConfig: Properties) = {
+  * The ClientIdConfigHandler will process clientId config changes in ZK.
+  * The callback provides the clientId and the full properties set read from ZK.
+  * This implementation reports the overrides to the respective ClientQuotaManager objects
+  */
+class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends ConfigHandler {
+  def processConfigChanges(clientId: String, clientConfig: Properties) {
     if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
-      quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
+      quotaManagers.produce.updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
     } else {
-      quotaManagers(ApiKeys.PRODUCE.id).resetQuota(clientId)
+      quotaManagers.fetch.resetQuota(clientId)
     }
 
     if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
-      quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
+      quotaManagers.fetch.updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
     } else {
-      quotaManagers(ApiKeys.FETCH.id).resetQuota(clientId)
+      quotaManagers.produce.resetQuota(clientId)
+    }
+  }
+}
+
+/**
+  * The BrokerConfigHandler will process individual broker config changes in ZK.
+  * The callback provides the brokerId and the full properties set read from ZK.
+  * This implementation reports the overrides to the respective ReplicationQuotaManager objects
+  */
+class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
+  def processConfigChanges(brokerId: String, properties: Properties) {
+    if (brokerConfig.brokerId == brokerId.trim.toInt) {
+      val limit = if (properties.containsKey(ThrottledReplicationRateLimitProp)) properties.getProperty(ThrottledReplicationRateLimitProp).toLong else Defaults.ThrottledReplicationRateLimit
+      quotaManagers.leader.updateQuota(upperBound(limit))
+      quotaManagers.follower.updateQuota(upperBound(limit))
     }
   }
 }
+
+object ThrottledReplicaValidator extends Validator {
+  override def ensureValid(name: String, value: scala.Any): Unit = {
+    value match {
+      case s: String => if (!isValid(s))
+        throw new ConfigException(name, value, s"$name  must match for format [partitionId],[brokerId]:[partitionId],[brokerId]:[partitionId],[brokerId] etc")
+      case _ => throw new ConfigException(name, value, s"$name  must be a string")
+    }
+  }
+
+  private def isValid(proposed: String): Boolean = {
+    proposed.trim.equals("*") || proposed.trim.matches("([0-9]+:[0-9]+)?(,[0-9]+:[0-9]+)*")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index a0ff00d..cf3a48f 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -54,6 +54,7 @@ case class FetchMetadata(fetchMinBytes: Int,
 class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
+                   quota: ReplicaQuota,
                    responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
   extends DelayedOperation(delayMs) {
 
@@ -69,6 +70,7 @@ class DelayedFetch(delayMs: Long,
    */
   override def tryComplete() : Boolean = {
     var accumulatedSize = 0
+    var accumulatedThrottledSize = 0
     fetchMetadata.fetchPartitionStatus.foreach {
       case (topicAndPartition, fetchStatus) =>
         val fetchOffset = fetchStatus.startOffsetMetadata
@@ -93,10 +95,15 @@ class DelayedFetch(delayMs: Long,
                 // Case C, this can happen when the fetch operation is falling behind the current segment
                 // or the partition has just rolled a new segment
                 debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
-                return forceComplete()
+                if (!(quota.isThrottled(topicAndPartition) && quota.isQuotaExceeded()))
+                  return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
-                // we need take the partition fetch size as upper bound when accumulating the bytes
-                accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+                // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
+                val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+                if (quota.isThrottled(topicAndPartition))
+                  accumulatedThrottledSize += bytesAvailable
+                else
+                  accumulatedSize += bytesAvailable
               }
             }
           }
@@ -111,7 +118,8 @@ class DelayedFetch(delayMs: Long,
     }
 
     // Case D
-    if (accumulatedSize >= fetchMetadata.fetchMinBytes)
+    if (accumulatedSize >= fetchMetadata.fetchMinBytes
+      || ((accumulatedSize + accumulatedThrottledSize) >= fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
       forceComplete()
     else
       false
@@ -130,7 +138,7 @@ class DelayedFetch(delayMs: Long,
   override def onComplete() {
     val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
       fetchMetadata.fetchOnlyCommitted,
-      fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))
+      fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo), quota)
 
     val fetchPartitionData = logReadResults.mapValues(result =>
       FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index eb406af..556534a 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -37,6 +37,8 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 object ConfigType {
   val Topic = "topics"
   val Client = "clients"
+  val Broker = "brokers"
+  val all = Seq(Topic, Client, Broker)
 }
 
 /**
@@ -92,8 +94,8 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
           val entityType = map.get("entity_type") match {
             case Some(ConfigType.Topic) => ConfigType.Topic
             case Some(ConfigType.Client) => ConfigType.Client
-            case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." +
-              " Received: " + json)
+            case Some(ConfigType.Broker) => ConfigType.Broker
+            case _ => throw new IllegalArgumentException(s"Config change notification must have 'entity_type' set to one of ${ConfigType.all}. Received: $json")
           }
 
           val entity = map.get("entity_name") match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a5258e..677b5dd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -24,6 +24,7 @@ import java.util.Properties
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api._
 import kafka.cluster.Partition
+import kafka.server.QuotaFactory.{UnboundedQuota, QuotaManagers}
 import kafka.common
 import kafka.common._
 import kafka.controller.KafkaController
@@ -34,8 +35,9 @@ import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete}
+import kafka.server.QuotaType._
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
@@ -60,11 +62,10 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val config: KafkaConfig,
                 val metadataCache: MetadataCache,
                 val metrics: Metrics,
-                val authorizer: Option[Authorizer]) extends Logging {
+                val authorizer: Option[Authorizer],
+                val quotas: QuotaManagers) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  // Store all the quota managers for each type of request
-  val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -144,7 +145,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseHeader = new ResponseHeader(correlationId)
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
+          val  result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
           new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
         } else {
           val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
@@ -393,7 +394,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
-      quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(
+      quotas.produce.recordAndMaybeThrottle(
         request.header.clientId,
         numBytesAppended,
         produceResponseCallback)
@@ -482,18 +483,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
 
-
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
-      // Do not throttle replication traffic
       if (fetchRequest.isFromFollower) {
+        //We've already evaluated against the quota and are good to go. Just need to record it now.
+        val size = sizeOfThrottledPartitions(fetchRequest, mergedPartitionData, quotas.leader)
+        quotas.leader.record(size)
         fetchResponseCallback(0)
       } else {
-        quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
-                                                               FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic),
-                                                                                          fetchRequest.versionId),
-                                                               fetchResponseCallback)
+        val size = FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId)
+        quotas.fetch.recordAndMaybeThrottle(fetchRequest.clientId, size, fetchResponseCallback)
       }
     }
 
@@ -506,10 +506,19 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchRequest.replicaId,
         fetchRequest.minBytes,
         authorizedRequestInfo,
+        replicationQuota(fetchRequest),
         sendResponseCallback)
     }
   }
 
+  private def sizeOfThrottledPartitions(fetchRequest: FetchRequest, mergedPartitionData: Map[TopicAndPartition, FetchResponsePartitionData], quota: ReplicationQuotaManager): Int = {
+    val throttledPartitions = mergedPartitionData.filter { case (partition, _) => quota.isThrottled(partition) }
+    FetchResponse.responseSize(throttledPartitions.groupBy(_._1.topic), fetchRequest.versionId)
+  }
+
+  def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
+    if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
+
   /**
    * Handle an offset request
    */
@@ -976,31 +985,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  /*
-   * Returns a Map of all quota managers configured. The request Api key is the key for the Map
-   */
-  private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
-    val producerQuotaManagerCfg = ClientQuotaManagerConfig(
-      quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
-      numQuotaSamples = cfg.numQuotaSamples,
-      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
-    )
-
-    val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
-      quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
-      numQuotaSamples = cfg.numQuotaSamples,
-      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
-    )
-
-    val quotaManagers = Map[Short, ClientQuotaManager](
-      ApiKeys.PRODUCE.id ->
-              new ClientQuotaManager(producerQuotaManagerCfg, metrics, ApiKeys.PRODUCE.name, new org.apache.kafka.common.utils.SystemTime),
-      ApiKeys.FETCH.id ->
-              new ClientQuotaManager(consumerQuotaManagerCfg, metrics, ApiKeys.FETCH.name, new org.apache.kafka.common.utils.SystemTime)
-    )
-    quotaManagers
-  }
-
   def handleLeaveGroupRequest(request: RequestChannel.Request) {
     val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
     val respHeader = new ResponseHeader(request.header.correlationId)
@@ -1047,9 +1031,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def close() {
-    quotaManagers.foreach { case (apiKey, quotaManager) =>
-      quotaManager.shutdown()
-    }
+    quotas.shutdown()
     info("Shutdown complete.")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 531ee62..3671297 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.util.Properties
-
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
@@ -104,6 +103,7 @@ object Defaults {
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
+  val ThrottledReplicationRateLimit = Long.MaxValue
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -153,6 +153,8 @@ object Defaults {
   val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
   val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
   val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
+  val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
+  val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
 
   val DeleteTopicEnable = false
 
@@ -313,7 +315,10 @@ object KafkaConfig {
   val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
   val NumQuotaSamplesProp = "quota.window.num"
+  val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
+  val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
+  val ThrottledReplicationRateLimitProp = "replication.quota.throttled.rate"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -520,8 +525,12 @@ object KafkaConfig {
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
   val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
-  val NumQuotaSamplesDoc = "The number of samples to retain in memory"
-  val QuotaWindowSizeSecondsDoc = "The time span of each sample"
+  val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
+  val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
+  val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
+  val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
+  val ThrottledReplicationRateLimitDoc = "A long representing the upper bound (bytes/sec) on replication traffic for replicas enumerated in the " +
+    s"property $ThrottledReplicationRateLimitProp. This property can be only set dynamically via the config command."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
@@ -562,6 +571,8 @@ object KafkaConfig {
   val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
   val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
+  def dynamicBrokerConfigs = Seq(KafkaConfig.ThrottledReplicationRateLimitProp)
+
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -705,8 +716,10 @@ object KafkaConfig {
       .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
       .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
       .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
+      .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
-
+      .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
+      .define(ThrottledReplicationRateLimitProp, LONG, Defaults.ThrottledReplicationRateLimit, atLeast(0), MEDIUM, ThrottledReplicationRateLimitDoc)
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
@@ -751,7 +764,7 @@ object KafkaConfig {
     import scala.collection.JavaConversions._
     val names = configDef.names()
     for (name <- props.keys)
-      require(names.contains(name), "Unknown configuration \"%s\".".format(name))
+      require(names.contains(name), "Unknown Kafka configuration \"%s\".".format(name))
   }
 
   def fromProps(props: Properties): KafkaConfig =
@@ -938,6 +951,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
   val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
   val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
+  val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)
+  val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
 
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b83a3ee..4509e37 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledS
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.AppInfoParser
 
-import scala.collection.mutable
+import scala.collection.{Map, mutable}
 import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
@@ -133,6 +133,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
   var kafkaHealthcheck: KafkaHealthcheck = null
   var metadataCache: MetadataCache = null
+  var quotaManagers: QuotaFactory.QuotaManagers = null
 
   var zkUtils: ZkUtils = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
@@ -172,6 +173,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
         metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
+        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
 
         brokerState.newState(Starting)
 
@@ -196,7 +198,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         /* start replica manager */
         replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
-          isShuttingDown)
+          isShuttingDown, quotaManagers.follower)
         replicaManager.startup()
 
         /* start kafka controller */
@@ -218,14 +220,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
 
         Mx4jLoader.maybeLoad()
 
         /* start dynamic config manager */
-        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
-                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
+                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
+                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
         // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
         // TODO: Move this logic to DynamicConfigManager
@@ -633,6 +636,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
     * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
     * <ol>
+    *
     * @return A brokerId.
     */
   private def getBrokerId: Int =  {


Mime
View raw message