kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290)
Date Sat, 14 Mar 2020 20:46:18 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78554e7  KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290)
78554e7 is described below

commit 78554e73f66e0cdf7d289e64b4f8b13b68c48acf
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Sat Mar 14 13:45:50 2020 -0700

    KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290)
    
    When we changed quota communication with KIP-219, fetch requests get throttled by returning
empty response with the delay in throttle_time_ms and Kafka consumer retries again after the
delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition).
The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization
usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able
to fetch any data.
    
    This PR ensures that consumers cannot get blocked by quota by capping fetchMaxBytes in
KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of
default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes
would be capped to 10MB.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../scala/kafka/server/ClientQuotaManager.scala    | 16 +++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 10 +++++++-
 .../integration/kafka/api/BaseQuotaTest.scala      | 28 +++++++++++++++++++---
 .../kafka/api/CustomQuotaCallbackTest.scala        |  6 +++--
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 24 +++++++++++++++++++
 5 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 4526528..8316d0c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -233,6 +233,22 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Returns maximum value (produced/consume bytes or request processing time) that could
be recorded without guaranteed throttling.
+   * Recording any larger value will always be throttled, even if no other values were recorded
in the quota window.
+   * This is used for deciding the maximum bytes that can be fetched at once
+   */
+  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
+    if (quotasEnabled) {
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
+      Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava))
+        .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds)
+        .getOrElse(Double.MaxValue)
+    } else {
+      Double.MaxValue
+    }
+  }
+
   def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs:
Long): Int = {
     var throttleTimeMs = 0
     val clientSensors = getOrCreateQuotaSensors(session, clientId)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2bc746f..d092cdd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -801,7 +801,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val fetchMaxBytes = Math.min(fetchRequest.maxBytes, config.fetchMaxBytes)
+    // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched
without being throttled given
+    // no bytes were recorded in the recent quota window
+    // trying to fetch more bytes would result in a guaranteed throttling potentially blocking
consumer progress
+    val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
+      Int.MaxValue
+    else
+      quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
+
+    val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
     val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
     if (interesting.isEmpty)
       processResponseCallback(Seq.empty)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 651eef6..4d9d23e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -15,6 +15,7 @@
 package kafka.api
 
 import java.time.Duration
+import java.util.concurrent.TimeUnit
 import java.util.{Collections, HashMap, Properties}
 
 import kafka.api.QuotaTestClients._
@@ -83,7 +84,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    quotaTestClients.consumeUntilThrottled(produced)
+    assertTrue("Should have consumed at least one record", quotaTestClients.consumeUntilThrottled(produced)
> 0)
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
@@ -107,6 +108,23 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   }
 
   @Test
+  def testProducerConsumerOverrideLowerQuota(): Unit = {
+    // consumer quota is set such that consumer quota * default quota window (10 seconds)
is less than
+    // MAX_PARTITION_FETCH_BYTES_CONFIG, so that we can test consumer ability to fetch in
this case
+    // In this case, 250 * 10 < 4096
+    quotaTestClients.overrideQuotas(2000, 250, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(2000, 250, Int.MaxValue)
+
+    val numRecords = 1000
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
+
+    // Consumer should be able to consume at least one record, even when throttled
+    assertTrue("Should have consumed at least one record", quotaTestClients.consumeUntilThrottled(produced)
> 0)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
+  }
+
+  @Test
   def testQuotaOverrideDelete(): Unit = {
     // Override producer and consumer quotas to unlimited
     quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
@@ -194,19 +212,23 @@ abstract class QuotaTestClients(topic: String,
   }
 
   def consumeUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int
= {
+    val timeoutMs = TimeUnit.MINUTES.toMillis(1)
+
     consumer.subscribe(Collections.singleton(topic))
     var numConsumed = 0
     var throttled = false
+    val startMs = System.currentTimeMillis
     do {
       numConsumed += consumer.poll(Duration.ofMillis(100L)).count
       val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
       throttled = metric != null && metricValue(metric) > 0
-    }  while (numConsumed < maxRecords && !throttled)
+    }  while (numConsumed < maxRecords && !throttled && System.currentTimeMillis
< startMs + timeoutMs)
 
     // If throttled, wait for the records from the last fetch to be received
     if (throttled && numConsumed < maxRecords && waitForRequestCompletion)
{
       val minRecords = numConsumed + 1
-      while (numConsumed < minRecords)
+      val startMs = System.currentTimeMillis
+      while (numConsumed < minRecords && System.currentTimeMillis < startMs
+ timeoutMs)
         numConsumed += consumer.poll(Duration.ofMillis(100L)).count
     }
     numConsumed
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index e442889..091a178 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -100,9 +100,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup
{
     quotaLimitCalls.values.foreach(_.set(0))
     user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false)
 
-    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once for each new
client
+    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once per throttled
produce request for each client
     assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get)
-    assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get)
+    // ClientQuotaCallback#quotaLimit is invoked once per each unthrottled and two for each
throttled request
+    // since we don't know the total number of requests, we verify it was called at least
twice (at least one throttled request)
+    assertTrue("quotaLimit must be called at least twice", quotaLimitCalls(ClientQuotaType.FETCH).get
> 2)
     assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get
<= 10) // sanity check
     // Large quota updated to small quota, should throttle
     user.configureAndWaitForQuota(9000, 3000)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e568268..a704163 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -194,6 +194,11 @@ class ClientQuotaManagerTest {
 
   private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String,
expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
     assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+    val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
+    val expectedMaxValueInQuotaWindow =
+      if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples
- 1) * expectedBound else Double.MaxValue
+    assertEquals(expectedMaxValueInQuotaWindow, quotaManager.getMaxValueInQuotaWindow(session,
clientId), 0.01)
+
     val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
     if (expectThrottle)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs
> 0)
@@ -202,6 +207,25 @@ class ClientQuotaManagerTest {
   }
 
   @Test
+  def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = {
+    val numFullQuotaWindows = 3   // 3 seconds window (vs. 10 seconds default)
+    val nonDefaultConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue,
numQuotaSamples = numFullQuotaWindows + 1)
+    val quotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "")
+    val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
+
+    try {
+      // no quota set
+      assertEquals(Double.MaxValue, quotaManager.getMaxValueInQuotaWindow(userSession, "client1"),
0.01)
+
+      // Set default <user> quota config
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(10,
true)))
+      assertEquals(10 * numFullQuotaWindows, quotaManager.getMaxValueInQuotaWindow(userSession,
"client1"), 0.01)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
   def testSetAndRemoveDefaultUserQuota(): Unit = {
     // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
     val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault
= Long.MaxValue),


Mime
View raw message