kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] kafka git commit: KAFKA-1464; Add a throttling option to the Kafka replication
Date Fri, 16 Sep 2016 05:28:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d43666102 -> 143a33bc5


http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 69e83c0..b14464f 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -40,7 +40,7 @@ class ClientQuotaManagerTest {
 
   @Test
   def testQuotaParsing() {
-    val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time)
+    val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time)
 
     // Case 1: Update the quota. Assert that the new quota value is returned
     clientMetrics.updateQuota("p1", new Quota(2000, true))
@@ -77,8 +77,8 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaViolation() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
-    val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "producer",
""))
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+    val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce",
""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
@@ -125,16 +125,16 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireThrottleTimeSensor() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
     try {
       clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
       // remove the throttle time sensor
-      metrics.removeSensor("producerThrottleTime-client1")
+      metrics.removeSensor("ProduceThrottleTime-client1")
       // should not throw an exception even if the throttle time sensor does not exist.
       val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
-      val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1")
+      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1")
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
     } finally {
       clientMetrics.shutdown()
@@ -144,21 +144,21 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireQuotaSensors() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
     try {
       clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
       // remove all the sensors
-      metrics.removeSensor("producerThrottleTime-client1")
-      metrics.removeSensor("producer-client1")
+      metrics.removeSensor("ProduceThrottleTime-client1")
+      metrics.removeSensor("Produce-client1")
       // should not throw an exception
       val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated
-      val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1")
+      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1")
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
 
-      val byteRateSensor = metrics.getSensor("producer-client1")
+      val byteRateSensor = metrics.getSensor("Produce-client1")
       assertTrue("Byte rate sensor should exist", byteRateSensor != null)
     } finally {
       clientMetrics.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index a9df929..bf11332 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -1,32 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 package kafka.server
 
 import java.util.Properties
 
-import org.apache.kafka.common.protocol.ApiKeys
+import kafka.log.LogConfig._
+import kafka.server.Constants._
 import org.junit.Assert._
 import org.apache.kafka.common.metrics.Quota
-import org.easymock.{Capture, EasyMock}
+import org.easymock.EasyMock
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
 import kafka.common._
-import kafka.log.LogConfig
 import kafka.admin.{AdminOperationException, AdminUtils}
 
 import scala.collection.Map
@@ -37,19 +37,19 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   @Test
   def testConfigChange() {
     assertTrue("Should contain a ConfigHandler for topics",
-               this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic))
+      this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic))
     val oldVal: java.lang.Long = 100000L
     val newVal: java.lang.Long = 200000L
     val tp = TopicAndPartition("test", 0)
     val logProps = new Properties()
-    logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
+    logProps.put(FlushMessagesProp, oldVal.toString)
     AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
-    logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
+    logProps.put(FlushMessagesProp, newVal.toString)
     AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
     TestUtils.retry(10000) {
       assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
@@ -59,21 +59,21 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   @Test
   def testClientQuotaConfigChange() {
     assertTrue("Should contain a ConfigHandler for topics",
-               this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client))
+      this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client))
     val clientId = "testClient"
     val props = new Properties()
     props.put(ClientConfigOverride.ProducerOverride, "1000")
     props.put(ClientConfigOverride.ConsumerOverride, "2000")
     AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
-    val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
+    val quotaManagers = servers.head.apis.quotas
 
     TestUtils.retry(10000) {
-      val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
-      val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
+      val overrideProducerQuota = quotaManagers.produce.quota(clientId)
+      val overrideConsumerQuota = quotaManagers.fetch.quota(clientId)
 
       assertEquals(s"ClientId $clientId must have overridden producer quota of 1000",
         Quota.upperBound(1000), overrideProducerQuota)
-        assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
+      assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
         Quota.upperBound(2000), overrideConsumerQuota)
     }
 
@@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
 
     TestUtils.retry(10000) {
-      val producerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
-      val consumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
+      val producerQuota = quotaManagers.produce.quota(clientId)
+      val consumerQuota = quotaManagers.fetch.quota(clientId)
 
       assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota,
         Quota.upperBound(defaultProducerQuota), producerQuota)
@@ -99,7 +99,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val topic = TestUtils.tempTopic
     try {
       val logProps = new Properties()
-      logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer)
+      logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
       AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
@@ -162,4 +162,47 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Verify that processConfigChanges was only called once
     EasyMock.verify(handler)
   }
-}
+
+  @Test
+  def shouldParseReplicationQuotaProperties {
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+    val props: Properties = new Properties()
+
+    //Given
+    props.put(ThrottledReplicasListProp, "0:101,0:102,1:101,1:102")
+
+    //When/Then
+    assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102))
+    assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103))
+  }
+
+  @Test
+  def shouldParseWildcardReplicationQuotaProperties {
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+    val props: Properties = new Properties()
+
+    //Given
+    props.put(ThrottledReplicasListProp, "*")
+
+    //When
+    val result = configHandler.parseThrottledPartitions(props, 102)
+
+    //Then
+    assertEquals(AllReplicas, result)
+  }
+
+  @Test
+  def shouldParseReplicationQuotaReset {
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+    val props: Properties = new Properties()
+
+    //Given
+    props.put(ThrottledReplicasListProp, "")
+
+    //When
+    val result = configHandler.parseThrottledPartitions(props, 102)
+
+    //Then
+    assertEquals(Seq(), result)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f5b515b..e7e1554 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -18,14 +18,15 @@ package kafka.server
 
 import kafka.log._
 import java.io.File
+import kafka.utils.SystemTime
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{Utils, MockTime => JMockTime}
+import org.apache.kafka.common.utils.{MockTime => JMockTime, Utils}
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
 import kafka.common._
 import kafka.cluster.Replica
-import kafka.utils.{KafkaScheduler, MockTime, SystemTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest {
@@ -56,7 +57,7 @@ class HighwatermarkPersistenceTest {
     val metrics = new Metrics
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime,
zkUtils, scheduler,
-      logManagers.head, new AtomicBoolean(false))
+      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head,
metrics, SystemTime).follower)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -99,7 +100,7 @@ class HighwatermarkPersistenceTest {
     val metrics = new Metrics
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime,
zkUtils,
-      scheduler, logManagers.head, new AtomicBoolean(false))
+      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head,
metrics, SystemTime).follower)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index c34e4f0..540a665 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -52,7 +52,7 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null,
new AtomicBoolean(false))
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null,
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
new file mode 100644
index 0000000..69365aa
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -0,0 +1,154 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.api._
+import kafka.cluster.Replica
+import kafka.common.TopicAndPartition
+import kafka.log.Log
+import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.utils._
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
+import org.easymock.EasyMock
+import org.easymock.EasyMock._
+import org.junit.Assert._
+import org.junit.{After, Test}
+
+
+class ReplicaManagerQuotasTest {
+  val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_,
new Properties()))
+  val time = new MockTime
+  val jTime = new JMockTime
+  val metrics = new Metrics
+  val message = new Message("some-data-in-a-message".getBytes())
+  val topicAndPartition1 = TopicAndPartition("test-topic", 1)
+  val topicAndPartition2 = TopicAndPartition("test-topic", 2)
+  val fetchInfo = Map(topicAndPartition1 -> PartitionFetchInfo(0, 100), topicAndPartition2
-> PartitionFetchInfo(0, 100))
+  var replicaManager: ReplicaManager = null
+
+  @Test
+  def shouldExcludeSubsequentThrottledPartitions(): Unit = {
+    setUpMocks(fetchInfo)
+
+    val quota = mockQuota(1000000)
+    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    replay(quota)
+
+    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
+      fetch.get(topicAndPartition1).get.info.messageSet.size)
+
+    assertEquals("But we shouldn't get the second", 0,
+      fetch.get(topicAndPartition2).get.info.messageSet.size)
+  }
+
+  @Test
+  def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = {
+    setUpMocks(fetchInfo)
+
+    val quota = mockQuota(1000000)
+    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    expect(quota.isQuotaExceeded()).andReturn(true).once()
+    replay(quota)
+
+    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
+      fetch.get(topicAndPartition1).get.info.messageSet.size)
+    assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
+      fetch.get(topicAndPartition2).get.info.messageSet.size)
+  }
+
+  @Test
+  def shouldGetBothMessagesIfQuotasAllow(): Unit = {
+    setUpMocks(fetchInfo)
+
+    val quota = mockQuota(1000000)
+    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    expect(quota.isQuotaExceeded()).andReturn(false).once()
+    replay(quota)
+
+    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
+      fetch.get(topicAndPartition1).get.info.messageSet.size)
+    assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
+      fetch.get(topicAndPartition2).get.info.messageSet.size)
+  }
+
+  def setUpMocks(fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], message: Message
= this.message) {
+    val zkUtils = createNiceMock(classOf[ZkUtils])
+    val scheduler = createNiceMock(classOf[KafkaScheduler])
+
+    //Create log which handles both a regular read and a 0 bytes read
+    val log = createMock(classOf[Log])
+    expect(log.logEndOffset).andReturn(20L).anyTimes()
+    expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
+
+    //if we ask for len 1 return a message
+    expect(log.read(anyObject(), geq(1), anyObject())).andReturn(
+      new FetchDataInfo(
+        new LogOffsetMetadata(0L, 0L, 0),
+        new ByteBufferMessageSet(message)
+      )).anyTimes()
+
+    //if we ask for len = 0, return 0 messages
+    expect(log.read(anyObject(), EasyMock.eq(0), anyObject())).andReturn(
+      new FetchDataInfo(
+        new LogOffsetMetadata(0L, 0L, 0),
+        new ByteBufferMessageSet()
+      )).anyTimes()
+    replay(log)
+
+    //Create log manager
+    val logManager = createMock(classOf[kafka.log.LogManager])
+
+    //Return the same log for each partition as it doesn't matter
+    expect(logManager.getLog(anyObject())).andReturn(Some(log)).anyTimes()
+    replay(logManager)
+
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler,
logManager,
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+
+    //create the two replicas
+    for (p <- fetchInfo.keySet) {
+      val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
+      val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
+      replica.highWatermark = new LogOffsetMetadata(5)
+      partition.leaderReplicaIdOpt = Some(replica.brokerId)
+      val allReplicas = List(replica)
+      allReplicas.foreach(partition.addReplicaIfNotExists(_))
+      partition.inSyncReplicas = allReplicas.toSet
+    }
+  }
+
+  @After
+  def tearDown() {
+    replicaManager.shutdown(false)
+    metrics.close()
+  }
+
+  def mockQuota(bound: Long): ReplicaQuota = {
+    val quota = createMock(classOf[ReplicaQuota])
+    expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
+    quota
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index bfb66b9..47e5461 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -66,7 +66,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
       partition.getOrCreateReplica(1)
@@ -84,7 +84,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
       partition.getOrCreateReplica(1)
@@ -101,7 +101,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false), Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower,
Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code)
@@ -126,7 +126,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
 
     try {
       var produceCallbackFired = false
@@ -195,7 +195,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false), Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower,
Option(this.getClass.getName))
     try {
       val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1,
"host2", 2))
       val metadataCache = EasyMock.createMock(classOf[MetadataCache])

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
new file mode 100644
index 0000000..5c41372
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -0,0 +1,123 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package unit.kafka.server
+
+import java.util.Collections
+
+import kafka.common.TopicAndPartition
+import kafka.server.QuotaType._
+import kafka.server.{QuotaType, ReplicationQuotaManager, ReplicationQuotaManagerConfig}
+import org.apache.kafka.common.metrics.{Quota, MetricConfig, Metrics}
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert.{assertFalse, assertTrue, assertEquals}
+import org.junit.Test
+import scala.collection.JavaConverters._
+
+class ReplicationQuotaManagerTest {
+  private val time = new MockTime
+
+  @Test
+  def shouldThrottleOnlyDefinedReplicas() {
+    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics,
QuotaType.Fetch, time)
+    quota.markThrottled("topic1", Seq(1, 2, 3))
+
+    assertTrue(quota.isThrottled(tp1(1)))
+    assertTrue(quota.isThrottled(tp1(2)))
+    assertTrue(quota.isThrottled(tp1(3)))
+    assertFalse(quota.isThrottled(tp1(4)))
+  }
+
+  @Test
+  def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = {
+    val metrics = newMetrics()
+    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(numQuotaSamples
= 10, quotaWindowSizeSeconds = 1), metrics, LeaderReplication, time)
+
+    //Given
+    quota.updateQuota(new Quota(100, true))
+
+    //Quota should not be broken when we start
+    assertFalse(quota.isQuotaExceeded())
+
+    //First window is fixed, so we'll skip it
+    time.sleep(1000)
+
+    //When we record up to the quota value after half a window
+    time.sleep(500)
+    quota.record(1)
+
+    //Then it should not break the quota
+    assertFalse(quota.isQuotaExceeded())
+
+    //When we record half the quota (half way through the window), we still should not break
+    quota.record(149) //150B, 1.5s
+    assertFalse(quota.isQuotaExceeded())
+
+    //Add a byte to push over quota
+    quota.record(1) //151B, 1.5s
+
+    //Then it should break the quota
+    assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s
+    assertTrue(quota.isQuotaExceeded())
+
+    //When we sleep for the remaining half the window
+    time.sleep(500) //151B, 2s
+
+    //Then Our rate should have halved (i.e back down below the quota)
+    assertFalse(quota.isQuotaExceeded())
+    assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s
+
+    //When we sleep for another half a window (now half way through second window)
+    time.sleep(500)
+    quota.record(99) //250B, 2.5s
+
+    //Then the rate should be exceeded again
+    assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s
+    assertFalse(quota.isQuotaExceeded())
+    quota.record(1)
+    assertTrue(quota.isQuotaExceeded())
+    assertEquals(251 / 2.5, rate(metrics), 0)
+
+    //Sleep for 2 more window
+    time.sleep(2 * 1000) //so now at 3.5s
+    assertFalse(quota.isQuotaExceeded())
+    assertEquals(251d / 4.5, rate(metrics), 0)
+  }
+
+  def rate(metrics: Metrics): Double = {
+    val metricName = metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking
byte-rate for " + LeaderReplication)
+    val leaderThrottledRate = metrics.metrics.asScala(metricName).value()
+    leaderThrottledRate
+  }
+
+  @Test
+  def shouldSupportWildcardThrottledReplicas(): Unit = {
+    val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics,
LeaderReplication, time)
+
+    //When
+    quota.markThrottled("MyTopic")
+
+    //Then
+    assertTrue(quota.isThrottled(TopicAndPartition("MyTopic", 0)))
+    assertFalse(quota.isThrottled(TopicAndPartition("MyOtherTopic", 0)))
+  }
+
+  private def tp1(id: Int): TopicAndPartition = new TopicAndPartition("topic1", id)
+
+  private def newMetrics(): Metrics = {
+    new Metrics(new MetricConfig(), Collections.emptyList(), time)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
new file mode 100644
index 0000000..af7c4c8
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -0,0 +1,242 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.server
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.admin.AdminUtils._
+import kafka.common._
+import kafka.log.LogConfig._
+import kafka.server.KafkaConfig.fromProps
+import kafka.server.QuotaType._
+import kafka.server._
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
+
+/**
+  * This is the main test which ensure Replication Quotas work correctly.
+  *
+  * The test will fail if the quota is < 1MB/s as 1MB is the default for replica.fetch.max.bytes.
+  * So with a throttle of 100KB/s, 1 fetch of 1 partition would fill 10s of quota. In turn
causing
+  * the throttled broker to pause for > 10s
+  *
+  * Anything over 100MB/s tends to fail as this is the non-throttled replication rate
+  */
+
+class ReplicationQuotasTest extends ZooKeeperTestHarness {
+  def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100)
+
+  val msg100KB = new Array[Byte](100000)
+  var brokers: Seq[KafkaServer] = null
+  val topic = "topic1"
+  var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+  }
+
+  @After
+  override def tearDown() {
+    brokers.par.foreach(_.shutdown())
+    producer.close()
+    super.tearDown()
+  }
+
+  @Test
+  def shouldBootstrapTwoBrokersWithLeaderThrottle(): Unit = {
+    shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(true)
+  }
+
+  @Test
+  def shouldBootstrapTwoBrokersWithFollowerThrottle(): Unit = {
+    shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(false)
+  }
+
+  def shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(leaderThrottle: Boolean): Unit
= {
+    /**
+      * In short we have 8 brokers, 2 are not-started. We assign replicas for the two non-started
+      * brokers, so when we start them we can monitor replication from the 6 to the 2.
+      *
+      * We also have two non-throttled partitions on two of the 6 brokers, just to make sure
+      * regular replication works as expected.
+      */
+
+    brokers = (100 to 105).map { id => TestUtils.createServer(fromProps(createBrokerConfig(id,
zkConnect))) }
+
+    //Given six partitions, lead on nodes 0,1,2,3,4,5 but will followers on node 6,7 (not
started yet)
+    //And two extra partitions 6,7, which we don't intend on throttling
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+      0 -> Seq(100, 106), //Throttled
+      1 -> Seq(101, 106), //Throttled
+      2 -> Seq(102, 106), //Throttled
+      3 -> Seq(103, 107), //Throttled
+      4 -> Seq(104, 107), //Throttled
+      5 -> Seq(105, 107), //Throttled
+      6 -> Seq(100, 106), //Not Throttled
+      7 -> Seq(101, 107) //Not Throttled
+    ))
+
+    val msg = msg100KB
+    val msgCount: Int = 1000
+    val expectedDuration = 10 //Keep the test to N seconds
+    var throttle: Long = msgCount * msg.length / expectedDuration
+    if (!leaderThrottle) throttle = throttle * 3 //Follower throttle needs to replicate 3x
as fast to get the same duration as there are three replicas to replicate for each of the
two follower brokers
+
+    //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the
six leaders, or two followers
+    (100 to 107).foreach { brokerId =>
+      changeBrokerConfig(zkUtils, Seq(brokerId), property(KafkaConfig.ThrottledReplicationRateLimitProp,
throttle.toString))
+    }
+    if (leaderThrottle)
+      changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100,1:101,2:102,3:103,4:104,5:105"))
//partition-broker:... throttle the 6 leaders
+    else
+      changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
//partition-broker:... throttle the two followers
+
+    //Add data equally to each partition
+    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
retries = 5, acks = 0)
+    (0 until msgCount).foreach { x =>
+      (0 to 7).foreach { partition =>
+        producer.send(new ProducerRecord(topic, partition, null, msg)).get
+      }
+    }
+
+    //Ensure data is fully written: broker 1 has partition 1, broker 2 has partition 2 etc
+    (0 to 5).foreach { id => waitForOffsetsToMatch(msgCount, id, 100 + id) }
+    //Check the non-throttled partitions too
+    waitForOffsetsToMatch(msgCount, 6, 100)
+    waitForOffsetsToMatch(msgCount, 7, 101)
+
+    val start = System.currentTimeMillis()
+
+    //When we create the 2 new, empty brokers
+    brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(106, zkConnect)))
+    brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(107, zkConnect)))
+
+    //Check that throttled config correctly migrated to the new brokers
+    (106 to 107).foreach { brokerId =>
+      assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound())
+    }
+    if (!leaderThrottle) {
+      (0 to 2).foreach { partition =>
+        assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(new TopicAndPartition(topic,
partition)))
+      }
+      (3 to 5).foreach { partition =>
+        assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(new TopicAndPartition(topic,
partition)))
+      }
+    }
+
+    //Wait for non-throttled partitions to replicate first
+    (6 to 7).foreach { id => waitForOffsetsToMatch(msgCount, id, 100 + id) }
+    val unthrottledTook = System.currentTimeMillis() - start
+
+    //Wait for replicas 0,1,2,3,4,5 to fully replicated to broker 106,107
+    (0 to 2).foreach { id => waitForOffsetsToMatch(msgCount, id, 106) }
+    (3 to 5).foreach { id => waitForOffsetsToMatch(msgCount, id, 107) }
+
+    val throttledTook = System.currentTimeMillis() - start
+
+    //Check the recorded throttled rate is what we expect
+    if (leaderThrottle) {
+      (100 to 105).map(brokerFor(_)).foreach { broker =>
+        val metricName = broker.metrics.metricName("byte-rate", LeaderReplication.toString,
"Tracking byte-rate for" + LeaderReplication)
+        val measuredRate = broker.metrics.metrics.asScala(metricName).value()
+        info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate")
+        assertEquals(throttle, measuredRate, percentError(25, throttle))
+      }
+    } else {
+      (106 to 107).map(brokerFor(_)).foreach { broker =>
+        val metricName = broker.metrics.metricName("byte-rate", FollowerReplication.toString,
"Tracking byte-rate for" + FollowerReplication)
+        val measuredRate = broker.metrics.metrics.asScala(metricName).value()
+        info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate")
+        assertEquals(throttle, measuredRate, percentError(25, throttle))
+      }
+    }
+
+    //Check the times for throttled/unthrottled are each side of what we expect
+    info(s"Unthrottled took: $unthrottledTook, Throttled took: $throttledTook, for expeted
$expectedDuration secs")
+    assertTrue(s"Unthrottled replication of ${unthrottledTook}ms should be < ${expectedDuration
* 1000}ms",
+      unthrottledTook < expectedDuration * 1000)
+    assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration
* 1000}ms"),
+      throttledTook > expectedDuration * 1000)
+    assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration
* 1500}ms"),
+      throttledTook < expectedDuration * 1000 * 1.5)
+  }
+
+  @Test
+  def shouldThrottleOldSegments(): Unit = {
+    /**
+      * Simple test which ensures throttled replication works when the dataset spans many
segments
+      */
+
+    //2 brokers with 1MB Segment Size & 1 partition
+    val config: Properties = createBrokerConfig(100, zkConnect)
+    config.put("log.segment.bytes", (1024 * 1024).toString)
+    brokers = Seq(TestUtils.createServer(fromProps(config)))
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 ->
Seq(100, 101)))
+
+    //Write 20MBs and throttle at 5MB/s
+    val msg = msg100KB
+    val msgCount: Int = 200
+    val expectedDuration = 4
+    val throttle: Long = msg.length * msgCount / expectedDuration
+
+    //Set the throttle limit leader
+    changeBrokerConfig(zkUtils, Seq(100), property(KafkaConfig.ThrottledReplicationRateLimitProp,
throttle.toString))
+    changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100"))
+
+    //Add data
+    addData(msgCount, msg)
+
+    val start = System.currentTimeMillis()
+
+    //Start the new broker (and hence start replicating)
+    brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(101, zkConnect)))
+    waitForOffsetsToMatch(msgCount, 0, 101)
+
+    val throttledTook = System.currentTimeMillis() - start
+
+    assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration
* 1000 * 0.9}ms"),
+      throttledTook > expectedDuration * 1000 * 0.9)
+    assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration
* 1500}ms"),
+      throttledTook < expectedDuration * 1000 * 1.5)
+  }
+
+  def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
+    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers),
retries = 5, acks = 0)
+    (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get
}
+    waitForOffsetsToMatch(msgCount, 0, 100)
+  }
+
+  private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean
= {
+    waitUntilTrue(() => {
+      offset == brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId)).map(_.logEndOffset).getOrElse(0)
+    }, s"Offsets did not match for partition $partitionId on broker $brokerId", 60000)
+  }
+
+  private def property(key: String, value: String) = {
+    new Properties() { put(key, value) }
+  }
+
+  private def brokerFor(id: Int): KafkaServer = brokers.filter(_.config.brokerId == id)(0)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 7741698..1052be5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -16,23 +16,23 @@
  */
 package kafka.server
 
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.{Collections, Properties}
+
 import kafka.api._
-import kafka.utils._
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
-import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
+import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.utils._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
-import org.junit.{Test, After, Before}
-
-import java.util.{Properties, Collections}
-import java.util.concurrent.atomic.AtomicBoolean
-import collection.JavaConversions._
-
 import org.easymock.EasyMock
-import org.I0Itec.zkclient.ZkClient
 import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConversions._
 
 class SimpleFetchTest {
 
@@ -99,7 +99,7 @@ class SimpleFetchTest {
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler,
logManager,
-      new AtomicBoolean(false))
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)
@@ -148,9 +148,9 @@ class SimpleFetchTest {
     val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
     assertEquals("Reading committed data should return messages only up to high watermark",
messagesToHW,
-      replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(true, true, fetchInfo, UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
-      replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(true, false, fetchInfo, UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
index 778f3f8..aae66d8 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -43,7 +43,7 @@ class ThrottledResponseExpirationTest {
 
   @Test
   def testExpire() {
-    val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer",
time)
+    val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce,
time)
 
     val delayQueue = new DelayQueue[ThrottledResponse]()
     val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 131a24a..dadc8a3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -933,18 +933,24 @@ object TestUtils extends Logging {
 
   def produceMessages(servers: Seq[KafkaServer],
                       topic: String,
-                      numMessages: Int): Seq[String] = {
+                      numMessages: Int,
+                      acks: Int = 0,
+                      valueBytes: Int = -1): Seq[Array[Byte]] = {
 
     val producer = createNewProducer(
       TestUtils.getBrokerListStrFromServers(servers),
       retries = 5,
-      requestTimeoutMs = 2000
+      requestTimeoutMs = 2000,
+      acks = acks
     )
 
-    val values = (0 until numMessages).map(x => s"test-$x")
+    val values = (0 until numMessages).map(x => valueBytes match {
+      case -1 => s"test-$x".getBytes
+      case _ => new Array[Byte](valueBytes)
+    })
     
     val futures = values.map { value =>
-      producer.send(new ProducerRecord(topic, null, null, value.getBytes))
+      producer.send(new ProducerRecord(topic, value))
     }
     futures.foreach(_.get)
     producer.close()


Mime
View raw message