kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5368: Fix skipped record metrics to use rate of sum instead of rate of count
Date Fri, 02 Jun 2017 19:21:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d7d1196a0 -> 0a8b10e27


KAFKA-5368: Fix skipped record metrics to use rate of sum instead of rate of count

This resolved the issue with Kafka Streams skipped records sensor reporting wrong values.

Jira ticket: https://issues.apache.org/jira/browse/KAFKA-5368

The contribution is my original work and I license the work to the project under the project's
open source license.

Author: Hamidreza Afzali <hrafzali@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3206 from hrafzali/KAFKA-5368_skipped-records-sensor-bug


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a8b10e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a8b10e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a8b10e2

Branch: refs/heads/trunk
Commit: 0a8b10e27f573612e1d5d6787b2096d3a8528b94
Parents: d7d1196
Author: Hamidreza Afzali <hrafzali@gmail.com>
Authored: Fri Jun 2 12:20:57 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jun 2 12:20:57 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/metrics/stats/Sum.java  | 45 ++++++++++++++++++++
 .../processor/internals/StreamThread.java       |  3 +-
 2 files changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a8b10e2/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
new file mode 100644
index 0000000..b40e9cd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.List;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * A {@link SampledStat} that maintains the sum of what it has seen.
+ */
+public class Sum extends SampledStat {
+
+    public Sum() {
+        super(0);
+    }
+
+    @Override
+    protected void update(Sample sample, MetricConfig config, double value, long now) {
+        sample.value += value;
+    }
+
+    @Override
+    public double combine(List<Sample> samples, MetricConfig config, long now) {
+        double total = 0.0;
+        for (Sample sample : samples)
+            total += sample.value;
+        return total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a8b10e2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 44cd1b1..624a15e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
@@ -359,7 +360,7 @@ public class StreamThread extends Thread {
             tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName,
"The average per-second number of closed tasks", this.tags), new Rate(new Count()));
 
             skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
-            skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName,
"The average per-second number of skipped records.", this.tags), new Rate(new Count()));
+            skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName,
"The average per-second number of skipped records.", this.tags), new Rate(new Sum()));
 
         }
 


Mime
View raw message