kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-4956: Verify client-side throttle time metrics in quota test
Date Thu, 01 Jun 2017 14:46:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 13d189df6 -> 44e036702


KAFKA-4956: Verify client-side throttle time metrics in quota test

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3190 from rajinisivaram/KAFKA-4956-unittest

(cherry picked from commit 640082776b429067613922c576a57ec716b1dbe9)
Signed-off-by: Rajini Sivaram <rajinisivaram@googlemail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44e03670
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44e03670
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44e03670

Branch: refs/heads/0.11.0
Commit: 44e036702e5ffa346fcb493f601f07083aef7913
Parents: 13d189d
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Thu Jun 1 15:45:30 2017 +0100
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Thu Jun 1 15:46:04 2017 +0100

----------------------------------------------------------------------
 .../integration/kafka/api/BaseQuotaTest.scala   | 38 ++++++++++++++++----
 1 file changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44e03670/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 918bb55..32f19e2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -14,19 +14,17 @@
 
 package kafka.api
 
-import java.util.{Collections, Properties}
+import java.util.{Collections, HashMap, Properties}
 
-import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId}
+import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaId,
QuotaType}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import kafka.server.QuotaType
-import org.apache.kafka.common.metrics.KafkaMetric
 
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
@@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   def testThrottledProducerConsumer() {
 
     val numRecords = 1000
-    val produced = produceUntilThrottled(producers.head, numRecords)
+    val producer = producers.head
+    val produced = produceUntilThrottled(producer, numRecords)
     assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+    verifyProducerThrottleTimeMetric(producer)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    consumeUntilThrottled(consumers.head, produced)
+    val consumer = consumers.head
+    consumeUntilThrottled(consumer, produced)
     assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
+    verifyConsumerThrottleTimeMetric(consumer)
   }
 
   @Test
@@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     }
 
     assertTrue("Should have been throttled", throttled)
+    verifyConsumerThrottleTimeMetric(consumer, Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
* 1000.0))
 
     assertNotNull("Exempt requests not recorded", exemptRequestMetric)
     assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
@@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     }
   }
 
+  private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
+    val tags = new HashMap[String, String]
+    tags.put("client-id", producerClientId)
+    val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics",
"", tags))
+    val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics",
"", tags))
+
+    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value
> 0.0,
+        s"Producer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+  }
+
+  private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime:
Option[Double] = None) {
+    val tags = new HashMap[String, String]
+    tags.put("client-id", consumerClientId)
+    val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics",
"", tags))
+    val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics",
"", tags))
+
+    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value
> 0.0,
+        s"Consumer throttle metric not updated: avg=${avgMetric.value} max=${maxMetric.value}")
+    maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too high: ${maxMetric.value}",
maxMetric.value <= max))
+  }
+
   private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): MetricName = {
     leaderNode.metrics.metricName("throttle-time",
                                   quotaType.toString,


Mime
View raw message