kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-5402; Avoid creating quota related metrics if quotas not enabled
Date Thu, 29 Jun 2017 12:01:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a05a00e50 -> dc95456f1


KAFKA-5402; Avoid creating quota related metrics if quotas not enabled

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3303 from rajinisivaram/KAFKA-5402


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc95456f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc95456f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc95456f

Branch: refs/heads/trunk
Commit: dc95456f1df1a6ba451944d515fcee81c29262f5
Parents: a05a00e
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Thu Jun 29 08:01:12 2017 -0400
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Thu Jun 29 08:01:12 2017 -0400

----------------------------------------------------------------------
 .../scala/kafka/server/ClientQuotaManager.scala | 26 ++++++++++---
 .../server/ClientRequestQuotaManager.scala      | 26 ++++++++++++-
 .../src/main/scala/kafka/server/KafkaApis.scala | 27 ++++---------
 .../kafka/api/BaseConsumerTest.scala            |  4 ++
 .../integration/kafka/api/BaseQuotaTest.scala   |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 36 ++++++++++++++++++
 .../kafka/server/ClientQuotaManagerTest.scala   | 40 ++++++++++----------
 7 files changed, 114 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 04f5239..5c85eef 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -138,7 +138,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val time: Time) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
-  private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue)
QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
+  @volatile private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue)
QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess
@@ -172,6 +172,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   /**
+   * Returns true if any quotas are enabled for this quota manager. This is used
+   * to determine if quota related metrics should be created.
+   * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have
+   * been configured for this broker at any time for this quota type, quotasEnabled will
+   * return true until the next broker restart, even if all quotas are subsequently deleted.
+   */
+  def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas
+
+  /**
    * Records that a user/clientId changed some metric being throttled (produced/consumed
bytes, request processing time etc.)
    * If quota has been violated, callback is invoked after a delay, otherwise the callback
is invoked immediately.
    * Throttle time calculation may be overridden by sub-classes.
@@ -183,9 +192,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * @return Number of milliseconds to delay the response in case of Quota violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Double, callback:
Int => Unit): Int = {
-    val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
-    recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+  def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback:
Int => Unit): Int = {
+    if (quotasEnabled) {
+      val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+      recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
+    } else {
+      // Don't record any metrics if quotas are not enabled at any level
+      val throttleTimeMs = 0
+      callback(throttleTimeMs)
+      throttleTimeMs
+    }
   }
 
   def recordAndThrottleOnQuotaViolation(clientSensors: ClientSensors, value: Double, callback:
Int => Unit): Int = {
@@ -405,7 +421,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             .quota(quota)
   }
 
-  protected def createSensor(sensorName: String, metricName: MetricName): Sensor = {
+  protected def getOrCreateSensor(sensorName: String, metricName: MetricName): Sensor = {
     sensorAccessor.getOrCreate(
         sensorName,
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 7e80be6..fbc1881 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -27,12 +27,34 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val time: Time) extends ClientQuotaManager(config, metrics,
QuotaType.Request, time) {
   val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
-  val exemptSensor = createSensor(exemptSensorName, exemptMetricName)
+  def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
 
   def recordExempt(value: Double) {
     exemptSensor.record(value)
   }
 
+  def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, requestThreadTimeNanos:
Long,
+      sendResponseCallback: Int => Unit, recordNetworkThreadTimeCallback: (Long =>
Unit) => Unit): Unit = {
+    if (quotasEnabled) {
+      val quotaSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
+      recordNetworkThreadTimeCallback(timeNanos => recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos)))
+
+      recordAndThrottleOnQuotaViolation(
+          quotaSensors,
+          nanosToPercentage(requestThreadTimeNanos),
+          sendResponseCallback)
+    } else {
+      sendResponseCallback(0)
+    }
+  }
+
+  def maybeRecordExempt(requestThreadTimeNanos: Long, recordNetworkThreadTimeCallback: (Long
=> Unit) => Unit): Unit = {
+    if (quotasEnabled) {
+      recordNetworkThreadTimeCallback(timeNanos => recordExempt(nanosToPercentage(timeNanos)))
+      recordExempt(nanosToPercentage(requestThreadTimeNanos))
+    }
+  }
+
   override protected def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Long
= {
     math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
   }
@@ -51,4 +73,6 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
 
   private def exemptSensorName: String = "exempt-" + QuotaType.Request
 
+  private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a4fd30c..68c34d7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -446,7 +446,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
 
-      quotas.produce.recordAndMaybeThrottle(
+      quotas.produce.maybeRecordAndThrottle(
         request.session.sanitizedUser,
         request.header.clientId,
         numBytesAppended,
@@ -594,7 +594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // result in data being loaded into memory, it is better to do this after throttling
to avoid OOM.
         val response = new FetchResponse(fetchedPartitionData, 0)
         val responseStruct = response.toStruct(versionId)
-        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
+        quotas.fetch.maybeRecordAndThrottle(request.session.sanitizedUser, clientId, responseStruct.sizeOf,
           fetchResponseCallback)
       }
     }
@@ -2003,16 +2003,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeNanos = time.nanoseconds
     }
-    val quotaSensors = quotas.request.getOrCreateQuotaSensors(request.session.sanitizedUser,
clientId)
-    def recordNetworkThreadTimeNanos(timeNanos: Long) {
-      quotas.request.recordNoThrottle(quotaSensors, nanosToPercentage(timeNanos))
-    }
-    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
-
-    quotas.request.recordAndThrottleOnQuotaViolation(
-        quotaSensors,
-        nanosToPercentage(request.requestThreadTimeNanos),
-        sendResponseCallback)
+    quotas.request.maybeRecordAndThrottle(request.session.sanitizedUser, clientId,
+        request.requestThreadTimeNanos, sendResponseCallback,
+        callback => request.recordNetworkThreadTimeCallback = Some(callback))
   }
 
   private def sendResponseExemptThrottle(response: RequestChannel.Response) {
@@ -2020,18 +2013,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def sendResponseExemptThrottle(request: RequestChannel.Request, sendResponseCallback:
() => Unit) {
-    def recordNetworkThreadTimeNanos(timeNanos: Long) {
-      quotas.request.recordExempt(nanosToPercentage(timeNanos))
-    }
-    request.recordNetworkThreadTimeCallback = Some(recordNetworkThreadTimeNanos)
-
-    quotas.request.recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
+    quotas.request.maybeRecordExempt(request.requestThreadTimeNanos,
+        callback => request.recordNetworkThreadTimeCallback = Some(callback))
     sendResponseCallback()
   }
 
   private def sendResponse(request: RequestChannel.Request, response: AbstractResponse) {
     requestChannel.sendResponse(RequestChannel.Response(request, response))
   }
-
-  private def nanosToPercentage(nanos: Long): Double = nanos * ClientQuotaManagerConfig.NanosToPercentagePerSecond
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 992e74a..3c36bb0 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -45,6 +45,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   val tp = new TopicPartition(topic, part)
   val part2 = 1
   val tp2 = new TopicPartition(topic, part2)
+  val producerClientId = "ConsumerTestProducer"
+  val consumerClientId = "ConsumerTestConsumer"
 
   // configure the servers and clients
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed
up shutdown
@@ -54,6 +56,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
   this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 32f19e2..c69c9a4 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -147,7 +147,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     consumer.subscribe(Collections.singleton(topic1))
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
-    while (!throttled && System.currentTimeMillis < endTimeMs) {
+    while ((!throttled || exemptRequestMetric == null) && System.currentTimeMillis
< endTimeMs) {
       consumer.poll(100)
       val throttleMetric = consumerRequestThrottleMetric
       throttled = throttleMetric != null && throttleMetric.value > 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index e565ce2..df49811 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -33,6 +33,8 @@ import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
+import kafka.server.QuotaType
+import kafka.server.KafkaServer
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build
time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
@@ -1472,6 +1474,40 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testQuotaMetricsNotCreatedIfNoQuotasConfigured() {
+    val numRecords = 1000
+    sendRecords(numRecords)
+
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset
= 0)
+
+    def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId:
String) {
+        val metricName = broker.metrics.metricName("throttle-time",
+                                  quotaType.toString,
+                                  "",
+                                  "user", "",
+                                  "client-id", clientId)
+        assertNull("Metric should not hanve been created " + metricName, broker.metrics.metric(metricName))
+    }
+    servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId))
+    servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId))
+    servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId))
+    servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId))
+
+    servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId))
+    servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId))
+    servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId))
+    servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId))
+
+    def assertNoExemptRequestMetric(broker: KafkaServer) {
+        val metricName = broker.metrics.metricName("exempt-request-time", QuotaType.Request.toString,
"")
+        assertNull("Metric should not hanve been created " + metricName, broker.metrics.metric(metricName))
+    }
+    servers.foreach(assertNoExemptRequestMetric(_))
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/dc95456f/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 183ba0b..f4a55ab 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -52,7 +52,7 @@ class ClientQuotaManagerTest {
       assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client2.user,
client2.clientId))
 
       // p1 should be throttled using the overridden quota
-      var throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId,
2500 * config.numQuotaSamples, this.callback)
+      var throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId,
2500 * config.numQuotaSamples, this.callback)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs
> 0)
 
       // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well
since the sensor was created.
@@ -60,14 +60,14 @@ class ClientQuotaManagerTest {
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(3000,
true)))
       assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true),
clientMetrics.quota(client1.user, client1.clientId))
 
-      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId,
0, this.callback)
+      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId,
0, this.callback)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
       clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(500,
true)))
       assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user,
client1.clientId))
 
-      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId,
0, this.callback)
+      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId,
0, this.callback)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs
> 0)
 
       // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
@@ -75,7 +75,7 @@ class ClientQuotaManagerTest {
       clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId,
Some(new Quota(4000, true)))
       assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true),
clientMetrics.quota(client1.user, client1.clientId))
 
-      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId,
1000 * config.numQuotaSamples, this.callback)
+      throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId,
1000 * config.numQuotaSamples, this.callback)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
 
     } finally {
@@ -154,7 +154,7 @@ class ClientQuotaManagerTest {
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle:
Boolean) {
       assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
-      val throttleTimeMs = quotaManager.recordAndMaybeThrottle(user, clientId, value * config.numQuotaSamples,
this.callback)
+      val throttleTimeMs = quotaManager.maybeRecordAndThrottle(user, clientId, value * config.numQuotaSamples,
this.callback)
       if (expectThrottle)
         assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs
> 0)
       else
@@ -230,7 +230,7 @@ class ClientQuotaManagerTest {
        * if we produce under the quota
        */
       for (_ <- 0 until 10) {
-        clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
+        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -241,7 +241,7 @@ class ClientQuotaManagerTest {
       // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
       // 10.5 seconds because the last window is half complete
       time.sleep(500)
-      val sleepTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 2300,
callback)
+      val sleepTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 2300,
callback)
 
       assertEquals("Should be throttled", 2100, sleepTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -257,12 +257,12 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 10) {
-        clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
+        clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 0, callback))
+                   0, clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "unknown", 0, callback))
     } finally {
       clientMetrics.shutdown()
     }
@@ -280,7 +280,7 @@ class ClientQuotaManagerTest {
        * if we are under the quota
        */
       for (_ <- 0 until 10) {
-        quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4),
callback)
+        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4),
callback)
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -292,7 +292,7 @@ class ClientQuotaManagerTest {
       // (10.2 - quota)/quota*window-size = (10.2-10)/10*10.5 seconds = 210ms
       // 10.5 seconds interval because the last window is half complete
       time.sleep(500)
-      val throttleTime = quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client",
millisToPercent(67.1), callback)
+      val throttleTime = quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client",
millisToPercent(67.1), callback)
 
       assertEquals("Should be throttled", 210, throttleTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -308,22 +308,22 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (_ <- 0 until 11) {
-        quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", millisToPercent(4),
callback)
+        quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", millisToPercent(4),
callback)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0,
callback))
+                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0,
callback))
 
       // Create a very large spike which requires > one quota window to bring within quota
-      assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client",
millisToPercent(500), callback))
+      assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client",
millisToPercent(500), callback))
       for (_ <- 0 until 10) {
         time.sleep(1000)
-        assertEquals(1000, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client",
0, callback))
+        assertEquals(1000, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client",
0, callback))
       }
       time.sleep(1000)
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, quotaManager.recordAndMaybeThrottle("ANONYMOUS", "test-client", 0,
callback))
+                   0, quotaManager.maybeRecordAndThrottle("ANONYMOUS", "test-client", 0,
callback))
 
     } finally {
       quotaManager.shutdown()
@@ -335,11 +335,11 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
     try {
-      clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, callback)
+      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
       // remove the throttle time sensor
       metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does not exist.
-      val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 10000,
callback)
+      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000,
callback)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
       val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:client1")
@@ -354,12 +354,12 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
     try {
-      clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, callback)
+      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
       // remove all the sensors
       metrics.removeSensor("ProduceThrottleTime-:client1")
       metrics.removeSensor("Produce-ANONYMOUS:client1")
       // should not throw an exception
-      val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 10000,
callback)
+      val throttleTime = clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 10000,
callback)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated


Mime
View raw message