kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4209; Reduce run time for quota integration tests
Date Thu, 29 Sep 2016 00:18:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 16f85e84c -> 71036527e


KAFKA-4209; Reduce run time for quota integration tests

Run quota tests which expect throttling only until the first produce/consume request is throttled.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1902 from rajinisivaram/KAFKA-4209


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71036527
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71036527
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71036527

Branch: refs/heads/trunk
Commit: 71036527e949c5c165e866163c55df9678327044
Parents: 16f85e8
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Thu Sep 29 01:03:44 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Sep 29 01:03:44 2016 +0100

----------------------------------------------------------------------
 .../integration/kafka/api/BaseQuotaTest.scala   | 98 ++++++++++----------
 1 file changed, 51 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71036527/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 16ee636..aa1717a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -14,24 +14,19 @@
 
 package kafka.api
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.apache.kafka.common.{MetricName, TopicPartition}
+import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.Map
-import scala.collection.mutable
-
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   def userPrincipal : String
@@ -87,18 +82,14 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   @Test
   def testThrottledProducerConsumer() {
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
 
     val numRecords = 1000
-    produce(producers.head, numRecords)
-
-    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)
-    assertTrue("Should have been throttled", allMetrics(producerMetricName).value() >
0)
+    val produced = produceUntilThrottled(producers.head, numRecords)
+    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    consume(consumers.head, numRecords)
-    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
-    assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() >
0)
+    consumeUntilThrottled(consumers.head, produced)
+    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
   }
 
   @Test
@@ -111,60 +102,71 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     overrideQuotas(Long.MaxValue, Long.MaxValue)
     waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
 
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
     val numRecords = 1000
-    produce(producers.head, numRecords)
-    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)
-    assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(),
0.0)
+    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
+    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
 
     // The "client" consumer does not get throttled.
-    consume(consumers.head, numRecords)
-    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
-    assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(),
0.0)
+    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
+    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
   }
 
   @Test
   def testQuotaOverrideDelete() {
     // Override producer and consumer quotas to unlimited
     overrideQuotas(Long.MaxValue, Long.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
 
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
     val numRecords = 1000
-    produce(producers.head, numRecords)
-    assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE,
producerQuotaId)).value() == 0)
-    consume(consumers.head, numRecords)
-    assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH,
consumerQuotaId)).value() == 0)
+    assertEquals(numRecords, produceUntilThrottled(producers.head, numRecords))
+    assertEquals("Should not have been throttled", 0.0, producerThrottleMetric.value, 0.0)
+    assertEquals(numRecords, consumeUntilThrottled(consumers.head, numRecords))
+    assertEquals("Should not have been throttled", 0.0, consumerThrottleMetric.value, 0.0)
 
     // Delete producer and consumer quota overrides. Consumer and producer should now be
     // throttled since broker defaults are very small
     removeQuotaOverrides()
-    produce(producers.head, numRecords)
-
-    assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE,
producerQuotaId)).value() > 0)
-    consume(consumers.head, numRecords)
-    assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH,
consumerQuotaId)).value() > 0)
+    val produced = produceUntilThrottled(producers.head, numRecords)
+    assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+
+    // Since producer may have been throttled after producing a couple of records,
+    // consume from beginning till throttled
+    consumers.head.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
+    consumeUntilThrottled(consumers.head, numRecords + produced)
+    assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
   }
 
-  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
-    var numBytesProduced = 0
-    for (i <- 0 to count) {
-      val payload = i.toString.getBytes
-      numBytesProduced += payload.length
+  def produceUntilThrottled(p: KafkaProducer[Array[Byte], Array[Byte]], maxRecords: Int):
Int = {
+    var numProduced = 0
+    var throttled = false
+    do {
+      val payload = numProduced.toString.getBytes
       p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload),
              new ErrorLoggingCallback(topic1, null, null, true)).get()
-      Thread.sleep(1)
-    }
-    numBytesProduced
+      numProduced += 1
+      val throttleMetric = producerThrottleMetric
+      throttled = throttleMetric != null && throttleMetric.value > 0
+    } while (numProduced < maxRecords && !throttled)
+    numProduced
   }
 
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
-    consumer.subscribe(List(topic1))
+  def consumeUntilThrottled(consumer: KafkaConsumer[Array[Byte], Array[Byte]], maxRecords:
Int): Int = {
+    consumer.subscribe(Collections.singleton(topic1))
     var numConsumed = 0
-    while (numConsumed < numRecords) {
-      for (cr <- consumer.poll(100)) {
-        numConsumed += 1
-      }
+    var throttled = false
+    do {
+      numConsumed += consumer.poll(100).count
+      val throttleMetric = consumerThrottleMetric
+      throttled = throttleMetric != null && throttleMetric.value > 0
+    }  while (numConsumed < maxRecords && !throttled)
+
+    // If throttled, wait for the records from the last fetch to be received
+    if (throttled && numConsumed < maxRecords) {
+      val minRecords = numConsumed + 1
+      while (numConsumed < minRecords)
+          numConsumed += consumer.poll(100).count
     }
+    numConsumed
   }
 
   def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
@@ -185,6 +187,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
                                   "user", quotaId.sanitizedUser.getOrElse(""),
                                   "client-id", quotaId.clientId.getOrElse(""))
   }
+  private def producerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.PRODUCE,
producerQuotaId))
+  private def consumerThrottleMetric = leaderNode.metrics.metrics.get(throttleMetricName(ApiKeys.FETCH,
consumerQuotaId))
 
   def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
     val props = new Properties()


Mime
View raw message