kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7240: -total metrics in Streams are incorrect (#5467)
Date Fri, 24 Aug 2018 18:21:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f48978  KAFKA-7240: -total metrics in Streams are incorrect (#5467)
6f48978 is described below

commit 6f48978b629d7f6eaf3f82783789cf46c3566d9b
Author: Sam Lendle <sam.lendle@gmail.com>
AuthorDate: Fri Aug 24 11:20:57 2018 -0700

    KAFKA-7240: -total metrics in Streams are incorrect (#5467)
    
    Changes:
    
    1. Add org.apache.kafka.streams.processor.internals.metrics.CumulativeCount analogous
to Count, but not a SampledStat
    2. Use CumulativeCount for -total metrics in streams instead of Count
    
    Testing strategy:
    
    Add a test in StreamsMetricsImplTest which fails on old, incorrect behavior
    
    The contribution is my original work and I license the work to the project under the project's
open source license.
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    |  7 ++--
 .../streams/processor/internals/StreamThread.java  | 13 ++++---
 .../internals/metrics/CumulativeCount.java         | 38 +++++++++++++++++++
 .../internals/metrics/StreamsMetricsImpl.java      |  2 +-
 .../internals/StreamsMetricsImplTest.java          | 44 ++++++++++++++++++++++
 5 files changed, 94 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 79df5d1..7f3d31f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -109,7 +110,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             );
             parent.add(
                 new MetricName("commit-total", group, "The total number of occurrence of
commit operations.", allTagMap),
-                new Count()
+                new CumulativeCount()
             );
 
             // add the operation metrics with additional tags
@@ -129,7 +130,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             );
             taskCommitTimeSensor.add(
                 new MetricName("commit-total", group, "The total number of occurrence of
commit operations.", tagMap),
-                new Count()
+                new CumulativeCount()
             );
 
             // add the metrics for enforced processing
@@ -140,7 +141,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             );
             taskEnforcedProcessSensor.add(
                     new MetricName("enforced-process-total", group, "The total number of
occurrence of enforced-process operations.", tagMap),
-                    new Count()
+                    new CumulativeCount()
             );
 
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index efd94ea..28cedbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
@@ -437,7 +438,7 @@ public class StreamThread extends Thread {
                 cache,
                 time,
                 () -> createProducer(taskId),
-                streamsMetrics.tasksClosedSensor);
+                streamsMetrics.taskClosedSensor);
         }
 
         private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -518,7 +519,7 @@ public class StreamThread extends Thread {
         private final Sensor processTimeSensor;
         private final Sensor punctuateTimeSensor;
         private final Sensor taskCreatedSensor;
-        private final Sensor tasksClosedSensor;
+        private final Sensor taskClosedSensor;
 
         StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
             super(metrics, threadName);
@@ -532,7 +533,7 @@ public class StreamThread extends Thread {
             addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
             // can't use addInvocationRateAndCount due to non-standard description string
             pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second
number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number
of record-poll calls", tagMap()), new Count());
+            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number
of record-poll calls", tagMap()), new CumulativeCount());
 
             processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
             addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
@@ -546,9 +547,9 @@ public class StreamThread extends Thread {
             taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics",
"The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS,
new Count()));
             taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics",
"The total number of newly created tasks", tagMap()), new Total());
 
-            tasksClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average
per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-            tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total
number of closed tasks", tagMap()), new Total());
+            taskClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
+            taskClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average
per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+            taskClosedSensor.add(metrics.metricName("task-closed-total", group, "The total
number of closed tasks", tagMap()), new Total());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
new file mode 100644
index 0000000..2c12c2b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.metrics;
+
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * A non-SampledStat version of Count for measuring -total metrics in streams
+ */
+public class CumulativeCount implements MeasurableStat {
+
+    private double count = 0.0;
+
+    @Override
+    public void record(final MetricConfig config, final double value, final long timeMs)
{
+        count += 1;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long now) {
+        return count;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 56166a4..1703112 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -398,7 +398,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                 "The total number of occurrence of " + operation + " operations.",
                 tags
             ),
-            new Count()
+            new CumulativeCount()
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index b065e2c..7ce27b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -17,11 +17,17 @@
 package org.apache.kafka.streams.processor.internals;
 
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
@@ -96,4 +102,42 @@ public class StreamsMetricsImplTest {
         streamsMetrics.removeSensor(sensor1);
         assertEquals(defaultMetrics, streamsMetrics.metrics().size());
     }
+
+    @Test
+    public void testTotalMetricDoesntDecrease() {
+        final MockTime time = new MockTime(1);
+        final MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS);
+        final Metrics metrics = new Metrics(config, time);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "");
+
+        final String scope = "scope";
+        final String entity = "entity";
+        final String operation = "op";
+
+        final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
+                scope,
+                entity,
+                operation,
+                Sensor.RecordingLevel.INFO
+        );
+
+        final double latency = 100.0;
+        final MetricName totalMetricName = metrics.metricName(
+                "op-total",
+                "stream-scope-metrics",
+                "",
+                "client-id",
+                "",
+                "scope-id",
+                "entity"
+        );
+
+        final KafkaMetric totalMetric = metrics.metric(totalMetricName);
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(i, Math.round(totalMetric.measurable().measure(config, time.milliseconds())));
+            sensor.record(latency, time.milliseconds());
+        }
+
+    }
 }


Mime
View raw message