kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/3] kafka git commit: KAFKA-1464; Add a throttling option to the Kafka replication
Date Fri, 16 Sep 2016 05:28:33 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
new file mode 100644
index 0000000..65e7c9e
--- /dev/null
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -0,0 +1,74 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import kafka.common.TopicAndPartition
+import kafka.server.QuotaType._
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+object QuotaType  {
+  case object Fetch extends QuotaType
+  case object Produce extends QuotaType
+  case object LeaderReplication extends QuotaType
+  case object FollowerReplication extends QuotaType
+}
+sealed trait QuotaType
+
+object QuotaFactory {
+
+  object UnboundedQuota extends ReplicaQuota {
+    override def isThrottled(topicAndPartition: TopicAndPartition): Boolean = false
+    override def isQuotaExceeded(): Boolean = false
+  }
+
+  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+    def shutdown() {
+      fetch.shutdown
+      produce.shutdown
+    }
+  }
+
+  def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time): QuotaManagers = {
+    QuotaManagers(
+      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
+      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
+      new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
+      new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
+    )
+  }
+
+  def clientProduceConfig(cfg: KafkaConfig): ClientQuotaManagerConfig =
+    ClientQuotaManagerConfig(
+      quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+
+  def clientFetchConfig(cfg: KafkaConfig): ClientQuotaManagerConfig =
+    ClientQuotaManagerConfig(
+      quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
+      numQuotaSamples = cfg.numQuotaSamples,
+      quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
+    )
+
+  def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
+    ReplicationQuotaManagerConfig(
+      numQuotaSamples = cfg.numReplicationQuotaSamples,
+      quotaWindowSizeSeconds = cfg.replicationQuotaWindowSizeSeconds
+    )
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 96c2a38..3894d9b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -21,9 +21,9 @@ import kafka.cluster.BrokerEndPoint
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 
-class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
-                                       "Replica", brokerConfig.numReplicaFetchers) {
+class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
+      extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
+        "Replica", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
     val threadName = threadNamePrefix match {
@@ -33,12 +33,12 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManage
         "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
     }
     new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
-      replicaMgr, metrics, time)
+      replicaMgr, metrics, time, quotaManager)
   }
 
   def shutdown() {
     info("shutting down")
     closeAllFetchers()
     info("shutdown completed")
-  }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ef602e4..b0bd070 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,11 +26,10 @@ import kafka.message.ByteBufferMessageSet
 import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_9_0}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
+
 import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
 import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode}
-import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest}
-import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
-import org.apache.kafka.common.security.ssl.SslFactory
+import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest, _}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{Errors, ApiKeys}
@@ -45,7 +44,9 @@ class ReplicaFetcherThread(name: String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager,
                            metrics: Metrics,
-                           time: Time)
+                           time: Time,
+                           quota: ReplicationQuotaManager
+                          )
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
@@ -134,6 +135,8 @@ class ReplicaFetcherThread(name: String,
       if (logger.isTraceEnabled)
         trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
           .format(replica.brokerId, topic, partitionId, followerHighWatermark))
+      if (quota.isThrottled(topicAndPartition))
+        quota.record(messageSet.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
         fatal(s"Disk error while replicating data for $topicAndPartition", e)
@@ -249,7 +252,6 @@ class ReplicaFetcherThread(name: String,
         networkClient.close(sourceBroker.id.toString)
         throw e
     }
-
   }
 
   private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
@@ -269,15 +271,13 @@ class ReplicaFetcherThread(name: String,
 
   protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
     val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]
-
-    partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
-      if (partitionFetchState.isActive)
-        requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
+    val quotaExceeded = quota.isQuotaExceeded
+    partitionMap.foreach { case ((partition, partitionFetchState)) =>
+      if (partitionFetchState.isActive && !(quota.isThrottled(partition) && quotaExceeded))
+        requestMap(new TopicPartition(partition.topic, partition.partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
     }
-
     new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))
   }
-
 }
 
 object ReplicaFetcherThread {

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f1cc694..f72df9a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,6 +28,7 @@ import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
 import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException,
                                         InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException,
@@ -108,7 +109,9 @@ class ReplicaManager(val config: KafkaConfig,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
-                     threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+                     quotaManager: ReplicationQuotaManager,
+                     threadNamePrefix: Option[String] = None
+                    ) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
@@ -116,7 +119,7 @@ class ReplicaManager(val config: KafkaConfig,
     new Partition(t, p, time, this)
   })
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
+  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
@@ -458,13 +461,14 @@ class ReplicaManager(val config: KafkaConfig,
                     replicaId: Int,
                     fetchMinBytes: Int,
                     fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
+                    quota: ReplicaQuota = UnboundedQuota,
                     responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
 
     // read from local logs
-    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
+    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo, quota)
 
     // if the fetch comes from the follower,
     // update its corresponding log end offset
@@ -489,8 +493,9 @@ class ReplicaManager(val config: KafkaConfig,
       val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
         (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo.get(topicAndPartition).get))
       }
+
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
-      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
+      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota ,responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
       val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq
@@ -507,8 +512,8 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def readFromLocalLog(fetchOnlyFromLeader: Boolean,
                        readOnlyCommitted: Boolean,
-                       readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
-
+                       readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo],
+                       quota: ReplicaQuota): Map[TopicAndPartition, LogReadResult] = {
     readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
       BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
@@ -536,10 +541,18 @@ class ReplicaManager(val config: KafkaConfig,
            * This can cause a replica to always be out of sync.
            */
           val initialLogEndOffset = localReplica.logEndOffset
+
           val logReadInfo = localReplica.log match {
             case Some(log) =>
               val adjustedFetchSize = if (Topic.isInternal(topic) && !readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else fetchSize
-              log.read(offset, adjustedFetchSize, maxOffsetOpt)
+
+              //Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
+              var fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt)
+
+              //If the partition is marked as throttled, and we are over-quota then exclude it
+              if (quota.isThrottled(TopicAndPartition(topic, partition)) && quota.isQuotaExceeded)
+                fetch = FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
+              fetch
             case None =>
               error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
               FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
new file mode 100644
index 0000000..ba2fb92
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -0,0 +1,202 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import kafka.common.TopicAndPartition
+import kafka.server.Constants._
+import kafka.server.ReplicationQuotaManagerConfig._
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+import org.apache.kafka.common.metrics._
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.kafka.common.metrics.stats.SimpleRate
+import org.apache.kafka.common.utils.Time
+
+/**
+  * Configuration settings for quota management
+  *
+  * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to internal replication
+  * @param numQuotaSamples            The number of samples to retain in memory
+  * @param quotaWindowSizeSeconds     The time span of each sample
+  *
+  */
+case class ReplicationQuotaManagerConfig(quotaBytesPerSecondDefault: Long = QuotaBytesPerSecondDefault,
+                                         numQuotaSamples: Int = DefaultNumQuotaSamples,
+                                         quotaWindowSizeSeconds: Int = DefaultQuotaWindowSizeSeconds)
+
+object ReplicationQuotaManagerConfig {
+  val QuotaBytesPerSecondDefault = Long.MaxValue
+  // Always have 10 whole windows + 1 current window
+  val DefaultNumQuotaSamples = 11
+  val DefaultQuotaWindowSizeSeconds = 1
+  // Purge sensors after 1 hour of inactivity
+  val InactiveSensorExpirationTimeSeconds = 3600
+}
+
+trait ReplicaQuota {
+  def isThrottled(topicAndPartition: TopicAndPartition): Boolean
+  def isQuotaExceeded(): Boolean
+}
+
+object Constants {
+  val AllReplicas = Seq[Int](-1)
+}
+
+/**
+  * Tracks replication metrics and comparing them to any quotas for throttled partitions.
+  *
+  * @param config          The quota configs
+  * @param metrics         The Metrics instance
+  * @param replicationType The name / key for this quota manager, typically leader or follower
+  * @param time            Time object to use
+  */
+class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
+                              private val metrics: Metrics,
+                              private val replicationType: QuotaType,
+                              private val time: Time) extends Logging with ReplicaQuota {
+  private val lock = new ReentrantReadWriteLock()
+  private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
+  private var quota: Quota = null
+  private val sensorAccess = new SensorAccess
+  private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString, s"Tracking byte-rate for ${replicationType}")
+
+  /**
+    * Update the quota
+    *
+    * @param quota
+    */
+  def updateQuota(quota: Quota) {
+    inWriteLock(lock) {
+      this.quota = quota
+      //The metric could be expired by another thread, so use a local variable and null check.
+      val metric = metrics.metrics.get(rateMetricName)
+      if (metric != null) {
+        metric.config(getQuotaMetricConfig(quota))
+      }
+    }
+  }
+
+  /**
+    * Check if the quota is currently exceeded
+    *
+    * @return
+    */
+  override def isQuotaExceeded(): Boolean = {
+    try {
+      sensor().checkQuotas()
+    } catch {
+      case qve: QuotaViolationException =>
+        trace("%s: Quota violated for sensor (%s), metric: (%s), metric-value: (%f), bound: (%f)".format(replicationType, sensor().name(), qve.metricName, qve.value, qve.bound))
+        return true
+    }
+    false
+  }
+
+  /**
+    * Is the passed partition throttled by this ReplicationQuotaManager
+    *
+    * @param topicPartition the partition to check
+    * @return
+    */
+  override def isThrottled(topicPartition: TopicAndPartition): Boolean = {
+    val partitions = throttledPartitions.get(topicPartition.topic)
+    if (partitions != null)
+      partitions.contains(topicPartition.partition) || (partitions eq AllReplicas)
+    else false
+  }
+
+  /**
+    * Add the passed value to the throttled rate. This method ignores the quota with
+    * the value being added to the rate even if the quota is exceeded
+    *
+    * @param value
+    */
+  def record(value: Long) {
+    try {
+      sensor().record(value)
+    } catch {
+      case qve: QuotaViolationException =>
+        trace(s"Record: Quota violated, but ignored, for sensor (${sensor.name}), metric: (${qve.metricName}), value : (${qve.value}), bound: (${qve.bound}), recordedValue ($value)")
+    }
+  }
+
+  /**
+    * Update the set of throttled partitions for this QuotaManager. The partitions passed, for
+    * any single topic, will replace any previous
+    *
+    * @param topic
+    * @param partitions the set of throttled partitions
+    * @return
+    */
+  def markThrottled(topic: String, partitions: Seq[Int]) {
+    throttledPartitions.put(topic, partitions)
+  }
+
+  /**
+    * Mark all replicas for this topic as throttled
+    *
+    * @param topic
+    * @return
+    */
+  def markThrottled(topic: String) {
+    markThrottled(topic, AllReplicas)
+  }
+
+  /**
+    * Remove list of throttled replicas for a certain topic
+    *
+    * @param topic
+    * @return
+    */
+  def removeThrottle(topic: String) {
+    throttledPartitions.remove(topic)
+  }
+
+  /**
+    * Returns the bound of the configured quota
+    *
+    * @return
+    */
+  def upperBound(): Long = {
+    inReadLock(lock) {
+      if (quota != null)
+        quota.bound().toLong
+      else
+        Long.MaxValue
+    }
+  }
+
+  private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(quota)
+  }
+
+  private def sensor(): Sensor = {
+    sensorAccess.getOrCreate(
+      replicationType.toString,
+      InactiveSensorExpirationTimeSeconds,
+      lock,
+      metrics,
+      () => rateMetricName,
+      () => getQuotaMetricConfig(quota),
+      () => new SimpleRate()
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/server/SensorAccess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/SensorAccess.scala b/core/src/main/scala/kafka/server/SensorAccess.scala
new file mode 100644
index 0000000..3a0130b
--- /dev/null
+++ b/core/src/main/scala/kafka/server/SensorAccess.scala
@@ -0,0 +1,76 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Metrics, Sensor, MeasurableStat, MetricConfig}
+
+/**
+  * Class which centralises the logic for creating/accessing sensors.
+  * The quota can be updated by wrapping it in the passed MetricConfig
+  *
+  * The later arguments are passed as methods as they are only called when the sensor is instantiated.
+  */
+class SensorAccess {
+
+  def getOrCreate(sensorName: String, expirationTime: Long, lock: ReentrantReadWriteLock, metrics: Metrics, metricName: () => MetricName, config: () => MetricConfig, measure: () => MeasurableStat): Sensor = {
+    var sensor: Sensor = null
+
+    /* Acquire the read lock to fetch the sensor. It is safe to call getSensor from multiple threads.
+     * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor
+     * will acquire the write lock and prevent the sensors from being read while they are being created.
+     * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the
+     * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added.
+     * This read lock waits until the writer thread has released its lock i.e. fully initialized the sensor
+     * at which point it is safe to read
+     */
+    lock.readLock().lock()
+    try {
+      sensor = metrics.getSensor(sensorName)
+    }
+    finally {
+      lock.readLock().unlock()
+    }
+
+    /* If the sensor is null, try to create it else return the existing sensor
+     * The sensor can be null, hence the null checks
+     */
+    if (sensor == null) {
+      /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it.
+       * Note that multiple threads may acquire the write lock if they all see a null sensor initially
+       * In this case, the writer checks the sensor after acquiring the lock again.
+       * This is safe from Double Checked Locking because the references are read
+       * after acquiring read locks and hence they cannot see a partially published reference
+       */
+      lock.writeLock().lock()
+      try {
+        // Set the var for both sensors in case another thread has won the race to acquire the write lock. This will
+        // ensure that we initialise `ClientSensors` with non-null parameters.
+        sensor = metrics.getSensor(sensorName)
+        if (sensor == null) {
+          sensor = metrics.sensor(sensorName, config(), expirationTime)
+          sensor.add(metricName(), measure())
+        }
+      } finally {
+        lock.writeLock().unlock()
+      }
+    }
+    sensor
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 21658d3..b4209e3 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -58,6 +58,7 @@ object CoreUtils extends Logging {
 
   /**
     * Create a thread
+    *
     * @param name The name of the thread
     * @param daemon Whether the thread should block JVM shutdown
     * @param fun The function to execute in the thread

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/main/scala/kafka/utils/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala
index f562ef7..d578a6a 100644
--- a/core/src/main/scala/kafka/utils/Time.scala
+++ b/core/src/main/scala/kafka/utils/Time.scala
@@ -40,7 +40,7 @@ object Time {
 /**
  * A mockable interface for time functions
  */
-trait Time {
+trait Time extends org.apache.kafka.common.utils.Time {
   
   def milliseconds: Long
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala b/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
new file mode 100644
index 0000000..366507d
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
@@ -0,0 +1,206 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.{After, Before, Test}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import scala.collection.Map
+import scala.collection.mutable
+
+class ClientQuotasTest extends KafkaServerTestHarness {
+  private val producerBufferSize = 300000
+  private val producerId1 = "QuotasTestProducer-1"
+  private val producerId2 = "QuotasTestProducer-2"
+  private val consumerId1 = "QuotasTestConsumer-1"
+  private val consumerId2 = "QuotasTestConsumer-2"
+
+  val numServers = 2
+  val overridingProps = new Properties()
+
+  // Low enough quota that a producer sending a small payload in a tight loop should get throttled
+  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
+  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
+
+  override def generateConfigs() = {
+    FixedPortTestUtils.createBrokerConfigs(numServers,
+                                           zkConnect,
+                                           enableControlledShutdown = false)
+            .map(KafkaConfig.fromProps(_, overridingProps))
+  }
+
+  var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  var replicaConsumers = mutable.Buffer[SimpleConsumer]()
+
+  var leaderNode: KafkaServer = null
+  var followerNode: KafkaServer = null
+  private val topic1 = "topic-1"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
+    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
+    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2)
+    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    val numPartitions = 1
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
+    leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
+    followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
+    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
+
+    // Create consumers
+    val consumerProps = new Properties
+    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
+    consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
+    consumers += new KafkaConsumer(consumerProps)
+    // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled
+    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1)
+
+    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
+    consumers += new KafkaConsumer(consumerProps)
+    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
+  }
+
+  @After
+  override def tearDown() {
+    producers.foreach( _.close )
+    consumers.foreach( _.close )
+    replicaConsumers.foreach( _.close )
+    super.tearDown()
+  }
+
+  @Test
+  def testThrottledProducerConsumer() {
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId1)
+    assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
+
+    // Consumer should read in a bursty manner and get throttled immediately
+    consume(consumers.head, numRecords)
+    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+    replicaConsumers.head.fetch(request)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId1)
+    assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
+  }
+
+  @Test
+  def testProducerConsumerOverrideUnthrottled() {
+    // Give effectively unlimited quota for producerId2 and consumerId2
+    val props = new Properties()
+    props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString)
+    props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString)
+
+    AdminUtils.changeClientIdConfig(zkUtils, producerId2, props)
+    AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props)
+
+    TestUtils.retry(10000) {
+      val overrideProducerQuota = leaderNode.apis.quotas.produce.quota(producerId2)
+      val overrideConsumerQuota = leaderNode.apis.quotas.fetch.quota(consumerId2)
+
+      assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
+    }
+
+
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers(1), numRecords)
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId2)
+    assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
+
+    // The "client" consumer does not get throttled.
+    consume(consumers(1), numRecords)
+    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
+    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
+    replicaConsumers(1).fetch(request)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId2)
+    assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
+  }
+
+  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
+    var numBytesProduced = 0
+    for (i <- 0 to count) {
+      val payload = i.toString.getBytes
+      numBytesProduced += payload.length
+      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
+             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      Thread.sleep(1)
+    }
+    numBytesProduced
+  }
+
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
+    consumer.subscribe(List(topic1))
+    var numConsumed = 0
+    while (numConsumed < numRecords) {
+      for (cr <- consumer.poll(100)) {
+        numConsumed += 1
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
deleted file mode 100644
index b6a0ae5..0000000
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package kafka.api
-
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.consumer.SimpleConsumer
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import org.junit.{After, Before, Test}
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-import scala.collection.Map
-import scala.collection.mutable
-
-class QuotasTest extends KafkaServerTestHarness {
-  private val producerBufferSize = 300000
-  private val producerId1 = "QuotasTestProducer-1"
-  private val producerId2 = "QuotasTestProducer-2"
-  private val consumerId1 = "QuotasTestConsumer-1"
-  private val consumerId2 = "QuotasTestConsumer-2"
-
-  val numServers = 2
-  val overridingProps = new Properties()
-
-  // Low enough quota that a producer sending a small payload in a tight loop should get throttled
-  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000")
-  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500")
-
-  override def generateConfigs() = {
-    FixedPortTestUtils.createBrokerConfigs(numServers,
-                                           zkConnect,
-                                           enableControlledShutdown = false)
-            .map(KafkaConfig.fromProps(_, overridingProps))
-  }
-
-  var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-  var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  var replicaConsumers = mutable.Buffer[SimpleConsumer]()
-
-  var leaderNode: KafkaServer = null
-  var followerNode: KafkaServer = null
-  private val topic1 = "topic-1"
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
-    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
-    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                      classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
-
-    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2)
-    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
-
-    val numPartitions = 1
-    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
-    leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
-    followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
-    assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
-
-    // Create consumers
-    val consumerProps = new Properties
-    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
-    consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
-    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                      classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-
-    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
-    consumers += new KafkaConsumer(consumerProps)
-    // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled
-    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1)
-
-    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
-    consumers += new KafkaConsumer(consumerProps)
-    replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
-  }
-
-  @After
-  override def tearDown() {
-    producers.foreach( _.close )
-    consumers.foreach( _.close )
-    replicaConsumers.foreach( _.close )
-    super.tearDown()
-  }
-
-  @Test
-  def testThrottledProducerConsumer() {
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
-
-    val numRecords = 1000
-    produce(producers.head, numRecords)
-
-    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.PRODUCE.name,
-                                                           "Tracking throttle-time per client",
-                                                           "client-id", producerId1)
-    assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
-
-    // Consumer should read in a bursty manner and get throttled immediately
-    consume(consumers.head, numRecords)
-    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
-    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
-    replicaConsumers.head.fetch(request)
-    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.FETCH.name,
-                                                           "Tracking throttle-time per client",
-                                                           "client-id", consumerId1)
-    assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
-  }
-
-  @Test
-  def testProducerConsumerOverrideUnthrottled() {
-    // Give effectively unlimited quota for producerId2 and consumerId2
-    val props = new Properties()
-    props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString)
-    props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString)
-
-    AdminUtils.changeClientIdConfig(zkUtils, producerId2, props)
-    AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props)
-
-    TestUtils.retry(10000) {
-      val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers
-      val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(producerId2)
-      val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(consumerId2)
-
-      assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
-      assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
-    }
-
-
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
-    val numRecords = 1000
-    produce(producers(1), numRecords)
-    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.PRODUCE.name,
-                                                           "Tracking throttle-time per client",
-                                                           "client-id", producerId2)
-    assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
-
-    // The "client" consumer does not get throttled.
-    consume(consumers(1), numRecords)
-    // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
-    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
-    replicaConsumers(1).fetch(request)
-    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.FETCH.name,
-                                                           "Tracking throttle-time per client",
-                                                           "client-id", consumerId2)
-    assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
-  }
-
-  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
-    var numBytesProduced = 0
-    for (i <- 0 to count) {
-      val payload = i.toString.getBytes
-      numBytesProduced += payload.length
-      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
-             new ErrorLoggingCallback(topic1, null, null, true)).get()
-      Thread.sleep(1)
-    }
-    numBytesProduced
-  }
-
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
-    consumer.subscribe(List(topic1))
-    var numConsumed = 0
-    while (numConsumed < numRecords) {
-      for (cr <- consumer.poll(100)) {
-        numConsumed += 1
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7cc25c8..5dca17f 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -16,9 +16,9 @@
  */
 package kafka.admin
 
+import kafka.server.KafkaConfig._
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
-import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.Test
 import java.util.Properties
@@ -31,7 +31,7 @@ import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
 
-import TestUtils._
+import kafka.utils.TestUtils._
 
 import scala.collection.{Map, immutable}
 
@@ -278,7 +278,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val partitionToBeReassigned = 0
     val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas))
-    reassignPartitionsCommand.reassignPartitions
+    reassignPartitionsCommand.reassignPartitions()
     // create brokers
     val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
@@ -385,20 +385,23 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val topic = "my-topic"
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
 
-    def makeConfig(messageSize: Int, retentionMs: Long) = {
-      var props = new Properties()
+    def makeConfig(messageSize: Int, retentionMs: Long, throttledReplicas: String) = {
+      val props = new Properties()
       props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
       props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
+      props.setProperty(LogConfig.ThrottledReplicasListProp, throttledReplicas)
       props
     }
 
-    def checkConfig(messageSize: Int, retentionMs: Long) {
+    def checkConfig(messageSize: Int, retentionMs: Long, throttledReplicas: String, quotaManagerIsThrottled: Boolean) {
       TestUtils.retry(10000) {
         for(part <- 0 until partitions) {
-          val logOpt = server.logManager.getLog(TopicAndPartition(topic, part))
-          assertTrue(logOpt.isDefined)
-          assertEquals(retentionMs, logOpt.get.config.retentionMs)
-          assertEquals(messageSize, logOpt.get.config.maxMessageSize)
+          val log = server.logManager.getLog(TopicAndPartition(topic, part))
+          assertTrue(log.isDefined)
+          assertEquals(retentionMs, log.get.config.retentionMs)
+          assertEquals(messageSize, log.get.config.maxMessageSize)
+          assertEquals(throttledReplicas, log.get.config.throttledReplicasList)
+          assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(new TopicAndPartition(topic, part)))
         }
       }
     }
@@ -406,24 +409,87 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     try {
       // create a topic with a few config overrides and check that they are applied
       val maxMessageSize = 1024
-      val retentionMs = 1000*1000
-      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
-      checkConfig(maxMessageSize, retentionMs)
+      val retentionMs = 1000 * 1000
+      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0"))
+
+      //TODO - uncommenting this line reveals a bug. The quota manager is not updated when properties are added on topic creation.
+      //      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", true)
 
       // now double the config values for the topic and check that it is applied
-      val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs)
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2*maxMessageSize, 2 * retentionMs))
-      checkConfig(2*maxMessageSize, 2 * retentionMs)
+      val newConfig: Properties = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*")
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*"))
+      checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", quotaManagerIsThrottled = true)
 
       // Verify that the same config can be read from ZK
       val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Topic, topic)
       assertEquals(newConfig, configInZk)
+
+      //Now delete the config
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
+      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, Defaults.ThrottledReplicasList,  quotaManagerIsThrottled = false)
+
+      //Add config back
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0"))
+      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", quotaManagerIsThrottled = true)
+
+      //Now ensure updating to "" removes the throttled replica list also
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties(){put(LogConfig.ThrottledReplicasListProp, "")})
+      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, Defaults.ThrottledReplicasList,  quotaManagerIsThrottled = false)
+
     } finally {
       server.shutdown()
       CoreUtils.delete(server.config.logDirs)
     }
   }
 
+  @Test
+  def shouldPropagateDynamicBrokerConfigs() {
+    val brokerIds = Seq(0, 1, 2)
+    val servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(TestUtils.createServer(_))
+
+    def wrap(limit: Long): Properties = {
+      val props = new Properties()
+      props.setProperty(KafkaConfig.ThrottledReplicationRateLimitProp, limit.toString)
+      props
+    }
+
+    def checkConfig(limit: Long) {
+      TestUtils.retry(10000) {
+        for (server <- servers) {
+          assertEquals("Leader Quota Manager was not updated", limit, server.quotaManagers.leader.upperBound)
+          assertEquals("Follower Quota Manager was not updated", limit, server.quotaManagers.follower.upperBound)
+        }
+      }
+    }
+
+    try {
+      val limit: Long = 42
+
+      // Set the limit & check it is applied to the log
+      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, wrap(limit))
+      checkConfig(limit)
+
+      // Now double the config values for the topic and check that it is applied
+      val newLimit = 2 * limit
+      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, wrap(newLimit))
+      checkConfig(newLimit)
+
+      // Verify that the same config can be read from ZK
+      for (brokerId <- brokerIds) {
+        val configInZk = AdminUtils.fetchEntityConfig(servers(brokerId).zkUtils, ConfigType.Broker, brokerId.toString)
+        assertEquals(newLimit, configInZk.getProperty(KafkaConfig.ThrottledReplicationRateLimitProp).toInt)
+      }
+
+      //Now delete the config
+      AdminUtils.changeBrokerConfig(servers(0).zkUtils, brokerIds, new Properties)
+      checkConfig(kafka.server.Defaults.ThrottledReplicationRateLimit)
+
+    } finally {
+      servers.foreach(_.shutdown())
+      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    }
+  }
+
   /**
    * This test simulates a client config change in ZK whose notification has been purged.
    * Basically, it asserts that notifications are bootstrapped from ZK
@@ -447,8 +513,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     try {
-      assertEquals(new Quota(1000, true), server.apis.quotaManagers(ApiKeys.PRODUCE.id).quota(clientId))
-      assertEquals(new Quota(2000, true), server.apis.quotaManagers(ApiKeys.FETCH.id).quota(clientId))
+      assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota(clientId))
+      assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota(clientId))
     } finally {
       server.shutdown()
       CoreUtils.delete(server.config.logDirs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 7c71aed..be798ff 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -16,46 +16,63 @@
  */
 package kafka.admin
 
+import java.util.Properties
+
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import org.junit.Assert._
 import org.junit.Test
-import kafka.utils.Logging
+import kafka.utils.{ZkUtils, Logging}
 import kafka.zk.ZooKeeperTestHarness
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   @Test
-  def testArgumentParse() {
+  def shouldParseArgumentsForClientsEntityType() {
+    testArgumentParse("clients")
+  }
+
+  @Test
+  def shouldParseArgumentsForTopicsEntityType() {
+    testArgumentParse("topics")
+  }
+
+  @Test
+  def shouldParseArgumentsForBrokersEntityType() {
+    testArgumentParse("brokers")
+  }
+
+  def testArgumentParse(entityType: String) = {
     // Should parse correctly
     var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-                                                     "--entity-name", "x",
-                                                     "--entity-type", "clients",
-                                                     "--describe"))
+      "--entity-name", "x",
+      "--entity-type", entityType,
+      "--describe"))
     createOpts.checkArgs()
 
     // For --alter and added config
     createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-                                                "--entity-name", "x",
-                                                "--entity-type", "clients",
-                                                "--alter",
-                                                "--add-config", "a=b,c=d"))
+      "--entity-name", "x",
+      "--entity-type", entityType,
+      "--alter",
+      "--add-config", "a=b,c=d"))
     createOpts.checkArgs()
 
     // For alter and deleted config
     createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-                                                "--entity-name", "x",
-                                                "--entity-type", "clients",
-                                                "--alter",
-                                                "--delete-config", "a,b,c"))
+      "--entity-name", "x",
+      "--entity-type", entityType,
+      "--alter",
+      "--delete-config", "a,b,c"))
     createOpts.checkArgs()
 
     // For alter and both added, deleted config
     createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-                                                "--entity-name", "x",
-                                                "--entity-type", "clients",
-                                                "--alter",
-                                                "--add-config", "a=b,c=d",
-                                                "--delete-config", "a"))
+      "--entity-name", "x",
+      "--entity-type", entityType,
+      "--alter",
+      "--add-config", "a=b,c=d",
+      "--delete-config", "a"))
     createOpts.checkArgs()
+
     val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts)
     assertEquals(2, addedProps.size())
     assertEquals("b", addedProps.getProperty("a"))
@@ -65,4 +82,139 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     assertEquals(1, deletedProps.size)
     assertEquals("a", deletedProps.head)
   }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailIfUnrecognisedEntityType(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"))
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+  }
+
+  @Test
+  def shouldAddClientConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "my-client-id",
+      "--entity-type", "clients",
+      "--alter",
+      "--add-config", "a=b,c=d"))
+
+    val configChange = new TestAdminUtils {
+      override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configChange: Properties): Unit = {
+        assertEquals("my-client-id", clientId)
+        assertEquals("b", configChange.get("a"))
+        assertEquals("d", configChange.get("c"))
+      }
+    }
+    ConfigCommand.alterConfig(null, createOpts, configChange)
+  }
+
+  @Test
+  def shouldAddTopicConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "my-topic",
+      "--entity-type", "topics",
+      "--alter",
+      "--add-config", "a=b,c=d"))
+
+    val configChange = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = {
+        assertEquals("my-topic", topic)
+        assertEquals("b", configChange.get("a"))
+        assertEquals("d", configChange.get("c"))
+      }
+    }
+    ConfigCommand.alterConfig(null, createOpts, configChange)
+  }
+
+  @Test
+  def shouldAddBrokerConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "a=b,c=d"))
+
+    val configChange = new TestAdminUtils {
+      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+        assertEquals(Seq(1), brokerIds)
+        assertEquals("b", configChange.get("a"))
+        assertEquals("d", configChange.get("c"))
+      }
+    }
+    ConfigCommand.alterConfig(null, createOpts, configChange)
+  }
+
+  @Test
+  def shouldSupportCommaSeparatedValues(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "my-topic",
+      "--entity-type", "topics",
+      "--alter",
+      "--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
+
+    val configChange = new TestAdminUtils {
+      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+        assertEquals(Seq(1), brokerIds)
+        assertEquals("b", configChange.get("a"))
+        assertEquals("d,e ,f", configChange.get("c"))
+        assertEquals("h,i", configChange.get("g"))
+      }
+    }
+    ConfigCommand.alterConfig(null, createOpts, configChange)
+  }
+
+  @Test (expected = classOf[IllegalArgumentException])
+  def shouldNotUpdateBrokerConfigIfMalformedEntityName(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1,2,3", //Don't support multiple brokers currently
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "a=b"))
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+  }
+
+  @Test (expected = classOf[IllegalArgumentException])
+  def shouldNotUpdateBrokerConfigIfMalformedConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "a="))
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+  }
+
+  @Test (expected = classOf[IllegalArgumentException])
+  def shouldNotUpdateBrokerConfigIfMalformedBracketConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "a=[b,c,d=e"))
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+  }
+
+  @Test
+  def shouldDeleteBrokerConfig(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--delete-config", "a,c"))
+
+    val configChange = new TestAdminUtils {
+      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+        val properties: Properties = new Properties
+        properties.put("a", "b")
+        properties.put("c", "d")
+        properties.put("e", "f")
+        properties
+      }
+
+      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+        assertEquals("f", configChange.get("e"))
+        assertEquals(1, configChange.size())
+      }
+    }
+    ConfigCommand.alterConfig(null, createOpts, configChange)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 791c4d2..38fca87 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -10,17 +10,17 @@
   * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
   * specific language governing permissions and limitations under the License.
   */
-package unit.kafka.admin
+package kafka.admin
 
-import kafka.admin.ReassignPartitionsCommand
+import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
 import kafka.utils.{CoreUtils, Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
-import org.junit.Assert.assertEquals
-
+import kafka.admin.ReplicationQuotaUtils._
 import scala.collection.Seq
 
 
@@ -41,8 +41,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
   @After
   override def tearDown() {
-    servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    servers.par.foreach(_.shutdown())
+    servers.par.foreach(server => CoreUtils.delete(server.config.logDirs))
     super.tearDown()
   }
 
@@ -55,7 +55,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
 
     //When we move the replica on 100 to broker 101
     ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""")
-    waitForReasignmentToComplete()
+    waitForReassignmentToComplete()
 
     //Then the replica should be on 101
     assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101))
@@ -75,7 +75,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //When rebalancing
     val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
     ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
-    waitForReasignmentToComplete()
+    waitForReassignmentToComplete()
 
     //Then the replicas should span all three brokers
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
@@ -96,18 +96,146 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     //When rebalancing
     val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
     ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
-    waitForReasignmentToComplete()
+    waitForReassignmentToComplete()
 
     //Then replicas should only span the first two brokers
     val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
     assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101))
   }
 
-  def waitForReasignmentToComplete() {
+  @Test
+  def shouldExecuteThrottledReassignment() {
+    //Given partitions on 3 of 3 brokers
+    val brokers = Array(100, 101, 102)
+    startBrokers(brokers)
+    createTopic(zkUtils, topicName, Map(
+      0 -> Seq(100, 101)
+    ), servers = servers)
+
+    //Given throttle set so replication will take a certain number of secs
+    val initialThrottle: Long = 1000 * 1000
+    val expectedDurationSecs = 5
+    val numMessages: Int = 50
+    val msgSize: Int = 100 * 1000
+    produceMessages(servers, topicName, numMessages, acks = 0, msgSize)
+    assertEquals(expectedDurationSecs, numMessages * msgSize / initialThrottle)
+
+    //Start rebalance which will move replica on 100 -> replica on 102
+    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+
+    val start = System.currentTimeMillis()
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
+
+    //Check throttle config. Should be throttling replica 0 on 100 and 102 only.
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:102")
+
+    //Await completion
+    waitForReassignmentToComplete()
+    val took = System.currentTimeMillis() - start
+
+    //Check move occurred
+    val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+    assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(101, 102))
+
+    //Then command should have taken longer than the throttle rate
+    assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took", took > expectedDurationSecs * 0.9 * 1000)
+    assertTrue(s"Expected replication to be < ${expectedDurationSecs * 2 * 1000} but was $took", took < expectedDurationSecs * 2 * 1000)
+  }
+
+
+  @Test
+  def shouldOnlyThrottleMovingReplicas() {
+    //Given 6 brokers, two topics
+    val brokers = Array(100, 101, 102, 103, 104, 105)
+    startBrokers(brokers)
+    createTopic(zkUtils, "topic1", Map(
+      0 -> Seq(100, 101),
+      1 -> Seq(100, 101),
+      2 -> Seq(103, 104) //will leave in place
+    ), servers = servers)
+
+    createTopic(zkUtils, "topic2", Map(
+      0 -> Seq(104, 105),
+      1 -> Seq(104, 105),
+      2 -> Seq(103, 104)//will leave in place
+    ), servers = servers)
+
+    //Given throttle set so replication will take a while
+    val throttle: Long = 100 * 1000
+    produceMessages(servers, "topic1", 10, acks = 0, 100 * 1000)
+    produceMessages(servers, "topic2", 10, acks = 0, 100 * 1000)
+
+    //Start rebalance
+    val newAssignment = Map(
+      TopicAndPartition("topic1", 0) -> Seq(100, 102),//moved 101=>102
+      TopicAndPartition("topic1", 1) -> Seq(100, 102),//moved 101=>102
+      TopicAndPartition("topic2", 0) -> Seq(103, 105),//moved 104=>103
+      TopicAndPartition("topic2", 1) -> Seq(103, 105),//moved 104=>103
+      TopicAndPartition("topic1", 2) -> Seq(103, 104), //didn't move
+      TopicAndPartition("topic2", 2) -> Seq(103, 104)  //didn't move
+    )
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), throttle)
+
+    //Check throttle config. Should be throttling specific replicas for each topic.
+    checkThrottleConfigAddedToZK(throttle, servers, "topic1", "1:101,1:102,0:101,0:102")
+    checkThrottleConfigAddedToZK(throttle, servers, "topic2", "1:103,1:104,0:103,0:104")
+  }
+
+  @Test
+  def shouldChangeThrottleOnRerunAndRemoveOnVerify() {
+    //Given partitions on 3 of 3 brokers
+    val brokers = Array(100, 101, 102)
+    startBrokers(brokers)
+    createTopic(zkUtils, topicName, Map(
+      0 -> Seq(100, 101)
+    ), servers = servers)
+
+    //Given throttle set so replication will take at least 20 sec (we won't wait this long)
+    val initialThrottle: Long = 1000 * 1000
+    produceMessages(servers, topicName, numMessages = 200, acks = 0, valueBytes = 100 * 1000)
+
+    //Start rebalance
+    val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(101, 102), json(topicName), true)._1
+
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
+
+    //Check throttle config
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:102")
+
+    //Ensure that running Verify, whilst the command is executing, should have no effect
+    ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+
+    //Check throttle config again
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, "0:100,0:102")
+
+    //Now re-run the same assignment with a larger throttle, which should only act to increase the throttle and make progress
+    val newThrottle = initialThrottle * 1000
+
+    ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment), newThrottle)
+
+    //Check throttle was changed
+    checkThrottleConfigAddedToZK(newThrottle, servers, topicName, "0:100,0:102")
+
+    //Await completion
+    waitForReassignmentToComplete()
+
+    //Verify should remove the throttle
+    ReassignPartitionsCommand.verifyAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
+
+    //Check removed
+    checkThrottleConfigRemovedFromZK(topicName, servers)
+
+    //Check move occurred
+    val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
+    assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(101, 102))
+  }
+
+  def waitForReassignmentToComplete() {
     waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted")
   }
 
-  def json(topic: String): String = {
-    s"""{"topics": [{"topic": "$topic"}],"version":1}"""
+  def json(topic: String*): String = {
+    val topicStr = topic.map { t => "{\"topic\": \"" + t + "\"}" }.mkString(",")
+    s"""{"topics": [$topicStr],"version":1}"""
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 0f71a19..91d5e20 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -16,9 +16,11 @@
  */
 package kafka.admin
 
+import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Test
+import org.junit.Assert.assertEquals
 
 class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -48,4 +50,90 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging wi
     checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
   }
 
+  @Test
+  def shouldFindMovingReplicas() {
+    val assigner = new ReassignPartitionsCommand(null, null)
+
+    //Given partition 0 moves from broker 100 -> 102
+    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101))
+    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102))
+
+      //When
+    val moves = assigner.replicaMoves(existing, proposed)
+
+    //Then moving replicas should be throttled
+    assertEquals("0:100,0:102", moves.get("topic1").get)
+  }
+
+  @Test
+  def shouldFindMovingReplicasMultiplePartitions() {
+    val assigner = new ReassignPartitionsCommand(null, null)
+
+    //Given partitions 0 & 1 moves from broker 100 -> 102
+    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101), TopicAndPartition("topic1",1) -> Seq(100, 101))
+    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102), TopicAndPartition("topic1",1) -> Seq(101, 102))
+
+      //When
+    val moves = assigner.replicaMoves(existing, proposed)
+
+    //Then moving replicas should be throttled
+    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic1").get)
+  }
+
+  @Test
+  def shouldFindMovingReplicasMultipleTopics() {
+    val assigner = new ReassignPartitionsCommand(null, null)
+
+    //Given partition 0 on topics 1 & 2 move from broker 100 -> 102
+    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101), TopicAndPartition("topic2",0) -> Seq(100, 101))
+    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102), TopicAndPartition("topic2",0) -> Seq(101, 102))
+
+    //When
+    val moves = assigner.replicaMoves(existing, proposed)
+
+    //Then moving replicas should be throttled
+    assertEquals("0:100,0:102", moves.get("topic1").get)
+    assertEquals("0:100,0:102", moves.get("topic2").get)
+  }
+
+  @Test
+  def shouldFindMovingReplicasMultipleTopicsAndPartitions() {
+    val assigner = new ReassignPartitionsCommand(null, null)
+
+    //Given
+    val existing = Map(
+      TopicAndPartition("topic1",0) -> Seq(100, 101),
+      TopicAndPartition("topic1",1) -> Seq(100, 101),
+      TopicAndPartition("topic2",0) -> Seq(100, 101),
+      TopicAndPartition("topic2",1) -> Seq(100, 101)
+    )
+    val proposed = Map(
+      TopicAndPartition("topic1",0) -> Seq(101, 102),
+      TopicAndPartition("topic1",1) -> Seq(101, 102),
+      TopicAndPartition("topic2",0) -> Seq(101, 102),
+      TopicAndPartition("topic2",1) -> Seq(101, 102)
+    )
+
+    //When
+    val moves = assigner.replicaMoves(existing, proposed)
+
+    //Then moving replicas should be throttled
+    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic1").get)
+    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic2").get)
+  }
+
+  @Test
+  def shouldFindTwoMovingReplicasInSamePartition() {
+    val assigner = new ReassignPartitionsCommand(null, null)
+
+    //Given partition 0 has 2 moves from broker 102 -> 104 & 103 -> 105
+    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101, 102, 103))
+    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(100, 101, 104, 105))
+
+    //When
+    val moves = assigner.replicaMoves(existing, proposed)
+
+    //Then moving replicas should be throttled
+    assertEquals( "0:102,0:103,0:104,0:105", moves.get("topic1").get)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
new file mode 100644
index 0000000..c6a42e2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -0,0 +1,51 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.admin
+
+import kafka.log.LogConfig
+import kafka.server.{KafkaConfig, ConfigType, KafkaServer}
+import kafka.utils.TestUtils
+
+import scala.collection.Seq
+
+object ReplicationQuotaUtils {
+
+  def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      val brokerReset = servers.forall { server =>
+        val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker, server.config.brokerId.toString)
+        !brokerConfig.contains(KafkaConfig.ThrottledReplicationRateLimitProp)
+      }
+      val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic, topic)
+      val topicReset = !topicConfig.contains(LogConfig.ThrottledReplicasListProp)
+      brokerReset && topicReset
+    }, "Throttle limit/replicas was not unset")
+  }
+
+  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledReplicas: String): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      //Check for limit in ZK
+      val brokerConfigAvailable = servers.forall { server =>
+        val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker, server.config.brokerId.toString)
+        val zkThrottleRate = configInZk.getProperty(KafkaConfig.ThrottledReplicationRateLimitProp)
+        zkThrottleRate != null && expectedThrottleRate == zkThrottleRate.toLong
+      }
+      //Check replicas assigned
+      val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, ConfigType.Topic, topic)
+      val property: String = topicConfig.getProperty(LogConfig.ThrottledReplicasListProp)
+      println(topic + "we found "+property)
+      val topicConfigAvailable = property == throttledReplicas
+      brokerConfigAvailable && topicConfigAvailable
+    }, "throttle limit/replicas was not set")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
new file mode 100644
index 0000000..c7e101f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
@@ -0,0 +1,27 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.admin
+
+import java.util.Properties
+import kafka.utils.ZkUtils
+
+class TestAdminUtils extends AdminUtilities {
+  override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit = {}
+  override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {new Properties}
+  override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties): Unit = {}
+  override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 3dd0454..881837f 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -90,5 +90,4 @@ class FetcherTest extends KafkaServerTestHarness {
         return
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 71e40da..3f2e3ed 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -19,8 +19,7 @@ package kafka.log
 
 import java.util.Properties
 
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
+import kafka.server.{ThrottledReplicaValidator, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigException
 import org.junit.{Assert, Test}
@@ -65,6 +64,34 @@ class LogConfigTest {
     })
   }
 
+  @Test
+  def shouldValidateThrottledReplicasConfig() {
+    assertTrue(isValid("*"))
+    assertTrue(isValid("* "))
+    assertTrue(isValid(""))
+    assertTrue(isValid(" "))
+    assertTrue(isValid("100:10"))
+    assertTrue(isValid("100:10,12:10"))
+    assertTrue(isValid("100:10,12:10,15:1"))
+    assertTrue(isValid("100:10,12:10,15:1  "))
+
+    assertFalse(isValid("100"))
+    assertFalse(isValid("100:"))
+    assertFalse(isValid("100:0,"))
+    assertFalse(isValid("100:0,10"))
+    assertFalse(isValid("100:0,10:"))
+    assertFalse(isValid("100:0,10:   "))
+  }
+
+  private def isValid(configValue: String): Boolean = {
+    try {
+      ThrottledReplicaValidator.ensureValid("", configValue)
+    } catch {
+      case e: ConfigException => return false
+    }
+    true
+  }
+
   private def assertPropertyInvalid(name: String, values: AnyRef*) {
     values.foreach((value) => {
       val props = new Properties

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index b95f2bf..9edd3c0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -28,6 +28,7 @@ import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
+import scala.collection.Map
 
 class AbstractFetcherThreadTest {
 


Mime
View raw message