kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2443 KAFKA-2567; Expose windowSize on Rate; - Throttle time should not return NaN
Date Mon, 12 Oct 2015 19:22:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe4818e09 -> 9fde92f9f


KAFKA-2443 KAFKA-2567; Expose windowSize on Rate; - Throttle time should not return NaN

This is a followup ticket from KAFKA-2084 to improve the windowSize calculation in Quotas.
I've made the following changes:

1. Added a windowSize function on Rate
2. Calling Rate.windowSize in ClientQuotaManager to return the exact window size to use when
computing the delay time.
3. Changed the window size calculation subtly. The current calculation had a bug wherein,
it used the number of elapsed seconds from the "lastWindowSeconds" of the most recent Sample
object. However, the lastWindowSeconds is the time when the sample is created.. this causes
an issue because it implies that the current window elapsed time is always "0" when the sample
is created. This is incorrect as demonstrated in a testcase I added in MetricsTest. I've fixed
the calculation to count the elapsed time from the "oldest" sample in the set since that gives
us an accurate value of the exact amount of time elapsed

Author: Aditya Auradkar <aauradkar@linkedin.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Joel Koshy <jjkoshy.w@gmail.com>

Closes #213 from auradkar/K-2443


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fde92f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fde92f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fde92f9

Branch: refs/heads/trunk
Commit: 9fde92f9f1f1e40ceb6b3d5ff9676ba590385825
Parents: fe4818e
Author: Aditya Auradkar <aauradkar@linkedin.com>
Authored: Mon Oct 12 12:22:39 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Mon Oct 12 12:22:39 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/metrics/KafkaMetric.java       |  4 +++
 .../apache/kafka/common/metrics/stats/Rate.java | 32 +++++++++++++++++---
 .../kafka/common/metrics/MetricsTest.java       | 28 +++++++++++++++++
 .../scala/kafka/server/ClientQuotaManager.scala | 23 ++++++++++----
 .../integration/kafka/api/QuotasTest.scala      |  4 +--
 .../kafka/server/ClientQuotaManagerTest.scala   | 11 ++++---
 6 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 89df1a4..e4d3ae8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -53,6 +53,10 @@ public final class KafkaMetric implements Metric {
         }
     }
 
+    public Measurable measurable() {
+        return this.measurable;
+    }
+
     double value(long timeMs) {
         return this.measurable.measure(config, timeMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index fe43940..9dfc457 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -59,10 +59,34 @@ public class Rate implements MeasurableStat {
     @Override
     public double measure(MetricConfig config, long now) {
         double value = stat.measure(config, now);
-        // the elapsed time is always N-1 complete windows plus whatever fraction of the
final window is complete
-        long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs;
-        long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1);
-        return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs);
+        return value / convert(windowSize(config, now));
+    }
+
+    public long windowSize(MetricConfig config, long now) {
+        // purge old samples before we compute the window size
+        stat.purgeObsoleteSamples(config, now);
+
+        /*
+         * Here we check the total amount of time elapsed since the oldest non-obsolete window.
+         * This give the total windowSize of the batch which is the time used for Rate computation.
+         * However, there is an issue if we do not have sufficient data for e.g. if only
1 second has elapsed in a 30 second
+         * window, the measured rate will be very high.
+         * Hence we assume that the elapsed time is always N-1 complete windows plus whatever
fraction of the final window is complete.
+         *
+         * Note that we could simply count the amount of time elapsed in the current window
and add n-1 windows to get the total time,
+         * but this approach does not account for sleeps. SampledStat only creates samples
whenever record is called,
+         * if no record is called for a period of time that time is not accounted for in
windowSize and produces incorrect results.
+         */
+        long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
+        // Check how many full windows of data we have currently retained
+        int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
+        int minFullWindows = config.samples() - 1;
+
+        // If the available windows are less than the minimum required, add the difference
to the totalElapsedTime
+        if (numFullWindows < minFullWindows)
+            totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
+
+        return totalElapsedTimeMs;
     }
 
     private double convert(long timeMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 8d3e33d..9058387 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -384,6 +384,34 @@ public class MetricsTest {
         assertEquals(0.0, p75.value(), 1.0);
     }
 
+    @Test
+    public void testRateWindowing() throws Exception {
+        // Use the default time window. Set 3 samples
+        MetricConfig cfg = new MetricConfig().samples(3);
+        Sensor s = metrics.sensor("test.sensor", cfg);
+        s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+
+        int sum = 0;
+        int count = cfg.samples() - 1;
+        // Advance 1 window after every record
+        for (int i = 0; i < count; i++) {
+            s.record(100);
+            sum += 100;
+            time.sleep(cfg.timeWindowMs());
+        }
+
+        // Sleep for half the window.
+        time.sleep(cfg.timeWindowMs() / 2);
+
+        // prior to any time passing
+        double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs()
/ 2) / 1000.0;
+
+        KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1"));
+        assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
+        assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
+                ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
+    }
+
     public static class ConstantMeasurable implements Measurable {
         public double value = 0.0;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index b21785f..24f294d 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -123,13 +123,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       case qve: QuotaViolationException =>
         // Compute the delay
         val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
-        throttleTimeMs = throttleTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId)))
+        throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
         delayQueueSensor.record()
-        clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(),
throttleTimeMs))
     }
+    // If the request is not throttled, a throttleTime of 0 ms is recorded
+    clientSensors.throttleTimeSensor.record(throttleTimeMs)
     throttleTimeMs
   }
 
@@ -141,11 +142,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * we need to add a delay of X to W such that O * W / (W + X) = T.
    * Solving for X, we get X = (O - T)/T * W.
    */
-  private def throttleTime(metricValue: Double, config: MetricConfig): Int = {
+  private def throttleTime(clientMetric: KafkaMetric, config: MetricConfig): Int = {
+    val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
     val quota = config.quota()
-    val difference = metricValue - quota.bound
-    val time = difference / quota.bound * config.timeWindowMs() * config.samples()
-    time.round.toInt
+    val difference = clientMetric.value() - quota.bound
+    // Use the precise window used by the rate calculation
+    val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
+    throttleTimeMs.round.toInt
+  }
+
+  // Casting to Rate because we only use Rate in Quota computation
+  private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = {
+    measurable match {
+      case r: Rate => r
+      case _ => throw new IllegalArgumentException(s"Metric $name is not a Rate metric,
value $measurable")
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 3343c53..38b3dbd 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -156,7 +156,7 @@ class QuotasTest extends KafkaServerTestHarness {
                                             RequestKeys.nameForKey(RequestKeys.ProduceKey),
                                             "Tracking throttle-time per client",
                                             "client-id", producerId2)
-    Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value())
+    Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value())
 
     // The "client" consumer does not get throttled.
     consume(consumers(1), numRecords)
@@ -167,7 +167,7 @@ class QuotasTest extends KafkaServerTestHarness {
                                             RequestKeys.nameForKey(RequestKeys.FetchKey),
                                             "Tracking throttle-time per client",
                                             "client-id", consumerId2)
-    Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value())
+    Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value())
   }
 
   def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9fde92f9/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 997928c..75e856a 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -71,10 +71,13 @@ class ClientQuotaManagerTest {
       Assert.assertEquals(0, queueSizeMetric.value().toInt)
 
       // Create a spike.
-      // 400*10 + 2000 = 6000/10 = 600 bytes per second.
-      // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200
-      val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback)
-      Assert.assertEquals("Should be throttled", 2200, sleepTime)
+      // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second.
+      // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
+      // 10.5 seconds because the last window is half complete
+      time.sleep(500)
+      val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback)
+
+      Assert.assertEquals("Should be throttled", 2100, sleepTime)
       Assert.assertEquals(1, queueSizeMetric.value().toInt)
       // After a request is delayed, the callback cannot be triggered immediately
       clientMetrics.throttledRequestReaper.doWork()


Mime
View raw message