kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7528: Standardize on Min/Avg/Max Kafka metrics' default value - NaN (#5908)
Date Tue, 20 Nov 2018 23:54:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 068ab9c  KAFKA-7528: Standardize on Min/Avg/Max Kafka metrics' default value - NaN
(#5908)
068ab9c is described below

commit 068ab9cefae301f3187ea885d645c425955e77d2
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Tue Nov 20 23:54:24 2018 +0000

    KAFKA-7528: Standardize on Min/Avg/Max Kafka metrics' default value - NaN (#5908)
    
    While metrics like Min, Avg and Max make sense to respective use Double.MAX_VALUE, 0.0
and Double.MIN_VALUE as default values to ease computation logic, exposing those values makes
reading them a bit misleading. For instance, how would you differentiate whether your -avg
metric has a value of 0 because it was given samples of 0 or no samples were fed to it?
    
    It makes sense to standardize on the output of these metrics with something that clearly
denotes that no values have been recorded.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/common/metrics/stats/Avg.java |  3 +-
 .../org/apache/kafka/common/metrics/stats/Max.java |  7 +++-
 .../org/apache/kafka/common/metrics/stats/Min.java |  7 +++-
 .../clients/consumer/internals/FetcherTest.java    | 12 +++---
 .../kafka/common/metrics/JmxReporterTest.java      |  2 +-
 .../apache/kafka/common/metrics/MetricsTest.java   | 44 ++++++++++++++--------
 .../apache/kafka/common/network/NioEchoServer.java | 14 ++++---
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |  6 +--
 .../connect/runtime/WorkerSourceTaskTest.java      |  2 +-
 .../runtime/distributed/DistributedHerderTest.java |  4 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala | 12 +++---
 12 files changed, 69 insertions(+), 46 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
index a09ca5a..4e6c337 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java
@@ -42,7 +42,8 @@ public class Avg extends SampledStat {
             total += s.value;
             count += s.eventCount;
         }
-        return count == 0 ? 0 : total / count;
+        return count == 0 ? Double.NaN : total / count;
     }
 
 }
+
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
index 6d75454..d91bf40 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java
@@ -37,9 +37,12 @@ public final class Max extends SampledStat {
     @Override
     public double combine(List<Sample> samples, MetricConfig config, long now) {
         double max = Double.NEGATIVE_INFINITY;
-        for (Sample sample : samples)
+        long count = 0;
+        for (Sample sample : samples) {
             max = Math.max(max, sample.value);
-        return max;
+            count += sample.eventCount;
+        }
+        return count == 0 ? Double.NaN : max;
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
index 7a18a2d..3b9925a 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java
@@ -37,9 +37,12 @@ public class Min extends SampledStat {
     @Override
     public double combine(List<Sample> samples, MetricConfig config, long now) {
         double min = Double.MAX_VALUE;
-        for (Sample sample : samples)
+        long count = 0;
+        for (Sample sample : samples) {
             min = Math.min(min, sample.value);
-        return min;
+            count += sample.eventCount;
+        }
+        return count == 0 ? Double.NaN : min;
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f4439ce..52b78e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1600,8 +1600,8 @@ public class FetcherTest {
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
 
-        // recordsFetchLagMax should be initialized to negative infinity
-        assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(),
EPSILON);
+        // recordsFetchLagMax should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLagMax.metricValue(), EPSILON);
 
         // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
@@ -1638,8 +1638,8 @@ public class FetcherTest {
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
 
-        // recordsFetchLeadMin should be initialized to MAX_VALUE
-        assertEquals(Double.MAX_VALUE, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
+        // recordsFetchLeadMin should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLeadMin.metricValue(), EPSILON);
 
         // recordsFetchLeadMin should be position - logStartOffset after receiving an empty
FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
@@ -1682,8 +1682,8 @@ public class FetcherTest {
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
 
-        // recordsFetchLagMax should be initialized to negative infinity
-        assertEquals(Double.NEGATIVE_INFINITY, (Double) recordsFetchLagMax.metricValue(),
EPSILON);
+        // recordsFetchLagMax should be initialized to NaN
+        assertEquals(Double.NaN, (Double) recordsFetchLagMax.metricValue(), EPSILON);
 
         // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
         fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 28179f3..c6e112a 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -46,7 +46,7 @@ public class JmxReporterTest {
             sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
 
             assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
-            assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg"));
+            assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg"));
             assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
             assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total"));
 
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e70be27..3184aeb 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -347,37 +347,51 @@ public class MetricsTest {
         MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
         max.record(config, 50, time.milliseconds());
         time.sleep(samples * windowMs);
-        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()),
EPS);
+        assertEquals(Double.NaN, max.measure(config, time.milliseconds()), EPS);
     }
 
+    /**
+     * Some implementations of SampledStat make sense to return NaN
+     * when there are no values set rather than the initial value
+     */
     @Test
-    public void testSampledStatInitialValue() {
-        // initialValue from each SampledStat is set as the initialValue on its Sample.
-        // The only way to test the initialValue is to infer it by having a SampledStat
-        // with expired Stats, because their values are reset to the initial values.
-        // Most implementations of combine on SampledStat end up returning the default
-        // value, so we can use this. This doesn't work for Percentiles though.
-        // This test looks a lot like testOldDataHasNoEffect because it's the same
-        // flow that leads to this state.
+    public void testSampledStatReturnsNaNWhenNoValuesExist() {
+        // This is tested by having a SampledStat with expired Stats,
+        // because their values get reset to the initial values.
         Max max = new Max();
         Min min = new Min();
         Avg avg = new Avg();
-        Count count = new Count();
-        Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
-
         long windowMs = 100;
         int samples = 2;
         MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
         max.record(config, 50, time.milliseconds());
         min.record(config, 50, time.milliseconds());
         avg.record(config, 50, time.milliseconds());
+
+        time.sleep(samples * windowMs);
+
+        assertEquals(Double.NaN, max.measure(config, time.milliseconds()), EPS);
+        assertEquals(Double.NaN, min.measure(config, time.milliseconds()), EPS);
+        assertEquals(Double.NaN, avg.measure(config, time.milliseconds()), EPS);
+    }
+
+    /**
+     * Some implementations of SampledStat make sense to return the initial value
+     * when there are no values set
+     */
+    @Test
+    public void testSampledStatReturnsInitialValueWhenNoValuesExist() {
+        Count count = new Count();
+        Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
+        long windowMs = 100;
+        int samples = 2;
+        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
+
         count.record(config, 50, time.milliseconds());
         sampledTotal.record(config, 50, time.milliseconds());
+
         time.sleep(samples * windowMs);
 
-        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()),
EPS);
-        assertEquals(Double.MAX_VALUE, min.measure(config, time.milliseconds()), EPS);
-        assertEquals(0.0, avg.measure(config, time.milliseconds()), EPS);
         assertEquals(0, count.measure(config, time.milliseconds()), EPS);
         assertEquals(0.0, sampledTotal.measure(config, time.milliseconds()), EPS);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fc02fe6..c6ef227 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -179,12 +179,14 @@ public class NioEchoServer extends Thread {
             long currentElapsedMs = time.milliseconds() - startMs;
             long thisMaxWaitMs = maxAggregateWaitMs - currentElapsedMs;
             String metricName = namePrefix + metricType.metricNameSuffix();
-            if (expectedValue == 0.0)
-                assertEquals(
-                        "Metric not updated " + metricName + " expected:<" + expectedValue
+ "> but was:<"
-                                + metricValue(metricName) + ">",
-                        metricType == MetricType.MAX ? Double.NEGATIVE_INFINITY : 0d, metricValue(metricName),
EPS);
-            else if (metricType == MetricType.TOTAL)
+            if (expectedValue == 0.0) {
+                Double expected = expectedValue;
+                if (metricType == MetricType.MAX || metricType == MetricType.AVG)
+                    expected = Double.NaN;
+
+                assertEquals("Metric not updated " + metricName + " expected:<" + expectedValue
+ "> but was:<"
+                    + metricValue(metricName) + ">", expected, metricValue(metricName),
EPS);
+            } else if (metricType == MetricType.TOTAL)
                 TestUtils.waitForCondition(() -> Math.abs(metricValue(metricName) - expectedValue)
<= EPS,
                         thisMaxWaitMs, () -> "Metric not updated " + metricName + " expected:<"
+ expectedValue
                                 + "> but was:<" + metricValue(metricName) + ">");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index fad4445..7223c3b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -198,7 +198,7 @@ public class WorkerSinkTaskTest {
         assertTaskMetricValue("status", "paused");
         assertTaskMetricValue("running-ratio", 0.0);
         assertTaskMetricValue("pause-ratio", 1.0);
-        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
 
         PowerMock.verifyAll();
     }
@@ -272,7 +272,7 @@ public class WorkerSinkTaskTest {
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 1.0);
         assertTaskMetricValue("batch-size-avg", 0.5);
-        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
         assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
         assertTaskMetricValue("offset-commit-success-percentage", 0.0);
 
@@ -350,7 +350,7 @@ public class WorkerSinkTaskTest {
         assertTaskMetricValue("pause-ratio", 0.0);
         assertTaskMetricValue("batch-size-max", 0.0);
         assertTaskMetricValue("batch-size-avg", 0.0);
-        assertTaskMetricValue("offset-commit-max-time-ms", Double.NEGATIVE_INFINITY);
+        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
         assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
         assertTaskMetricValue("offset-commit-success-percentage", 0.0);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index db73a8e..95762f3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -861,7 +861,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         if (minimumPollCountExpected > 0) {
             assertTrue(pollBatchTimeMax >= 0.0d);
         }
-        assertTrue(pollBatchTimeAvg >= 0.0d);
+        assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
         double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
         assertEquals(0, activeCount, 0.000001d);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 1217ef9..10ebb0e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1506,8 +1506,8 @@ public class DistributedHerderTest {
         assertEquals(isRebalancing ? 1.0d : 0.0d, rebalancing, 0.0001d);
         assertEquals(millisSinceLastRebalance, rebalanceTimeSinceLast, 0.0001d);
         if (rebalanceTime <= 0L) {
-            assertEquals(Double.NEGATIVE_INFINITY, rebalanceTimeMax, 0.0001d);
-            assertEquals(0.0d, rebalanceTimeAvg, 0.0001d);
+            assertEquals(Double.NaN, rebalanceTimeMax, 0.0001d);
+            assertEquals(Double.NaN, rebalanceTimeAvg, 0.0001d);
         } else {
             assertEquals(rebalanceTime, rebalanceTimeMax, 0.0001d);
             assertEquals(rebalanceTime, rebalanceTimeAvg, 0.0001d);
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 7ac3c66..9b02d80 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -228,7 +228,7 @@ abstract class QuotaTestClients(topic: String,
     if (expectThrottle) {
       assertTrue(s"Client with id=$clientId should have been throttled", throttleMetricValue
> 0)
     } else {
-      assertEquals(s"Client with id=$clientId should not have been throttled", 0.0, throttleMetricValue,
0.0)
+      assertTrue(s"Client with id=$clientId should not have been throttled", throttleMetricValue.isNaN)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7b2fde3..da4ad67 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -121,14 +121,14 @@ class RequestQuotaTest extends BaseRequestTest {
 
   @Test
   def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated() {
-    val apiKey = ApiKeys.PRODUCE;
+    val apiKey = ApiKeys.PRODUCE
     submitTest(apiKey, () => checkSmallQuotaProducerRequestThrottleTime(apiKey))
     waitAndCheckResults()
   }
 
   @Test
   def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated() {
-    val apiKey = ApiKeys.FETCH;
+    val apiKey = ApiKeys.FETCH
     submitTest(apiKey, () => checkSmallQuotaConsumerRequestThrottleTime(apiKey))
     waitAndCheckResults()
   }
@@ -475,7 +475,7 @@ class RequestQuotaTest extends BaseRequestTest {
     assertTrue(s"Throttle time metrics for produce quota not updated: $smallQuotaProducerClient",
       throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, QuotaType.Produce)
> 0)
     assertTrue(s"Throttle time metrics for request quota updated: $smallQuotaProducerClient",
-      throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, QuotaType.Request)
== 0)
+      throttleTimeMetricValueForQuotaType(smallQuotaProducerClientId, QuotaType.Request).isNaN)
   }
 
   private def checkSmallQuotaConsumerRequestThrottleTime(apiKey: ApiKeys) {
@@ -488,7 +488,7 @@ class RequestQuotaTest extends BaseRequestTest {
     assertTrue(s"Throttle time metrics for consumer quota not updated: $smallQuotaConsumerClientId",
       throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.Fetch) >
0)
     assertTrue(s"Throttle time metrics for request quota updated: $smallQuotaConsumerClient",
-      throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.Request)
== 0)
+      throttleTimeMetricValueForQuotaType(smallQuotaConsumerClientId, QuotaType.Request).isNaN)
   }
 
   private def checkUnthrottledClient(apiKey: ApiKeys) {
@@ -497,7 +497,7 @@ class RequestQuotaTest extends BaseRequestTest {
     val unthrottledClient = Client(unthrottledClientId, apiKey)
     unthrottledClient.runUntil(response => responseThrottleTime(apiKey, response) <=
0.0)
     assertEquals(1, unthrottledClient.correlationId)
-    assertTrue(s"Client should not have been throttled: $unthrottledClient", throttleTimeMetricValue(unthrottledClientId)
<= 0.0)
+    assertTrue(s"Client should not have been throttled: $unthrottledClient", throttleTimeMetricValue(unthrottledClientId).isNaN)
   }
 
   private def checkExemptRequestMetric(apiKey: ApiKeys) {
@@ -507,7 +507,7 @@ class RequestQuotaTest extends BaseRequestTest {
     val updated = client.runUntil(response => exemptRequestMetricValue > exemptTarget)
 
     assertTrue(s"Exempt-request-time metric not updated: $client", updated)
-    assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId)
<= 0.0)
+    assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId).isNaN)
   }
 
   private def checkUnauthorizedRequestThrottle(apiKey: ApiKeys) {


Mime
View raw message