kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
Date Tue, 23 Jul 2019 23:54:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8aedc8  KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
a8aedc8 is described below

commit a8aedc85ebfadcf1472acafe2e0311a73d3040be
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Jul 23 18:54:20 2019 -0500

    KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
    
    * Clean up one redundant and one misplaced metric
    * Clarify the relationship among these metrics to avoid future confusion
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |  4 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  4 +-
 .../kafka/common/metrics/MeasurableStat.java       |  2 +-
 .../apache/kafka/common/metrics/stats/Count.java   | 30 +++-------
 .../common/metrics/stats}/CumulativeCount.java     | 22 +++-----
 .../stats/{Total.java => CumulativeSum.java}       | 19 ++++---
 .../apache/kafka/common/metrics/stats/Meter.java   | 24 ++++----
 .../apache/kafka/common/metrics/stats/Rate.java    | 27 ++-------
 .../org/apache/kafka/common/metrics/stats/Sum.java | 30 +++-------
 .../apache/kafka/common/metrics/stats/Total.java   | 36 +++---------
 .../stats/{Count.java => WindowedCount.java}       | 26 +++------
 .../metrics/stats/{Sum.java => WindowedSum.java}   | 11 ++--
 .../org/apache/kafka/common/network/Selector.java  | 18 +++---
 .../kafka/common/metrics/JmxReporterTest.java      | 14 ++---
 .../kafka/common/metrics/KafkaMbeanTest.java       |  8 +--
 .../apache/kafka/common/metrics/MetricsTest.java   | 64 +++++++++++-----------
 .../apache/kafka/common/metrics/SensorTest.java    |  4 +-
 .../kafka/common/metrics/stats/MeterTest.java      |  2 +-
 .../java/org/apache/kafka/test/MetricsBench.java   |  4 +-
 .../org/apache/kafka/connect/runtime/Worker.java   | 14 ++---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 10 ++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  6 +-
 .../runtime/distributed/DistributedHerder.java     |  4 +-
 .../runtime/errors/ErrorHandlingMetrics.java       | 16 +++---
 .../main/scala/kafka/network/SocketServer.scala    |  5 +-
 .../scala/kafka/server/ClientQuotaManager.scala    |  4 +-
 .../streams/kstream/internals/metrics/Sensors.java |  8 +--
 .../streams/processor/internals/StreamTask.java    |  8 +--
 .../internals/metrics/StreamsMetricsImpl.java      |  5 +-
 .../processor/internals/RecordCollectorTest.java   |  4 +-
 .../processor/internals/StandbyTaskTest.java       |  4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  8 +--
 32 files changed, 185 insertions(+), 260 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d5faa6e..b92a4a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
-        return new Meter(new Count(),
+        return new Meter(new WindowedCount(),
                 metrics.metricName(baseName + "-rate", groupName,
                         String.format("The number of %s per second", descriptiveName)),
                 metrics.metricName(baseName + "-total", groupName,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 08ab9fb..d4d028d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -1657,7 +1657,7 @@ public class Fetcher<K, V> implements Closeable {
             this.fetchLatency = metrics.sensor("fetch-latency");
             this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
             this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
-            this.fetchLatency.add(new Meter(new Count(), metrics.metricInstance(metricsRegistry.fetchRequestRate),
+            this.fetchLatency.add(new Meter(new WindowedCount(), metrics.metricInstance(metricsRegistry.fetchRequestRate),
                     metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
 
             this.recordsFetchLag = metrics.sensor("records-lag");
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
index aedac9a..035449e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics;
 /**
  * A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
  * This is the interface used for most of the simple statistics such as {@link org.apache.kafka.common.metrics.stats.Avg},
- * {@link org.apache.kafka.common.metrics.stats.Max}, {@link org.apache.kafka.common.metrics.stats.Count}, etc.
+ * {@link org.apache.kafka.common.metrics.stats.Max}, {@link org.apache.kafka.common.metrics.stats.CumulativeCount}, etc.
  */
 public interface MeasurableStat extends Stat, Measurable {
 
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
index 3da91c4..2bef7cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
@@ -16,30 +16,14 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import java.util.List;
-
-import org.apache.kafka.common.metrics.MetricConfig;
-
 /**
  * A {@link SampledStat} that maintains a simple count of what it has seen.
+ * This is a special kind of {@link WindowedSum} that always records a value of {@code 1} instead of the provided value.
+ *
+ * See also {@link CumulativeCount} for a non-sampled version of this metric.
+ *
+ * @deprecated since 2.4 . Use {@link WindowedCount} instead
  */
-public class Count extends SampledStat {
-
-    public Count() {
-        super(0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += 1.0;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        for (Sample sample : samples)
-            total += sample.value;
-        return total;
-    }
-
+@Deprecated
+public class Count extends WindowedCount {
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
similarity index 66%
rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
rename to clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
index 2c12c2b..85591b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
@@ -14,25 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals.metrics;
+package org.apache.kafka.common.metrics.stats;
 
-import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
 
 /**
- * A non-SampledStat version of Count for measuring -total metrics in streams
+ * A non-sampled version of {@link WindowedCount} maintained over all time.
+ *
+ * This is a special kind of {@link CumulativeSum} that always records {@code 1} instead of the provided value.
+ * In other words, it counts the number of
+ * {@link CumulativeCount#record(MetricConfig, double, long)} invocations,
+ * instead of summing the recorded values.
  */
-public class CumulativeCount implements MeasurableStat {
-
-    private double count = 0.0;
-
+public class CumulativeCount extends CumulativeSum {
     @Override
     public void record(final MetricConfig config, final double value, final long timeMs) {
-        count += 1;
-    }
-
-    @Override
-    public double measure(final MetricConfig config, final long now) {
-        return count;
+        super.record(config, 1, timeMs);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
similarity index 73%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
copy to clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
index b8a83f5..13f12a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
@@ -20,28 +20,31 @@ import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
 
 /**
- * An un-windowed cumulative total maintained over all time.
+ * An non-sampled cumulative total maintained over all time.
+ * This is a non-sampled version of {@link WindowedSum}.
+ *
+ * See also {@link CumulativeCount} if you just want to increment the value by 1 on each recording.
  */
-public class Total implements MeasurableStat {
+public class CumulativeSum implements MeasurableStat {
 
     private double total;
 
-    public Total() {
-        this.total = 0.0;
+    public CumulativeSum() {
+        total = 0.0;
     }
 
-    public Total(double value) {
-        this.total = value;
+    public CumulativeSum(double value) {
+        total = value;
     }
 
     @Override
     public void record(MetricConfig config, double value, long now) {
-        this.total += value;
+        total += value;
     }
 
     @Override
     public double measure(MetricConfig config, long now) {
-        return this.total;
+        return total;
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
index 91d4461..a6bdc9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
@@ -23,48 +23,46 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.CompoundStat;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.stats.Rate.SampledTotal;
 
 
 /**
  * A compound stat that includes a rate metric and a cumulative total metric.
  */
 public class Meter implements CompoundStat {
-
     private final MetricName rateMetricName;
     private final MetricName totalMetricName;
     private final Rate rate;
-    private final Total total;
+    private final CumulativeSum total;
 
     /**
-     * Construct a Meter with seconds as time unit and {@link SampledTotal} stats for Rate
+     * Construct a Meter with seconds as time unit
      */
     public Meter(MetricName rateMetricName, MetricName totalMetricName) {
-        this(TimeUnit.SECONDS, new SampledTotal(), rateMetricName, totalMetricName);
+        this(TimeUnit.SECONDS, new WindowedSum(), rateMetricName, totalMetricName);
     }
 
     /**
-     * Construct a Meter with provided time unit and {@link SampledTotal} stats for Rate
+     * Construct a Meter with provided time unit
      */
     public Meter(TimeUnit unit, MetricName rateMetricName, MetricName totalMetricName) {
-        this(unit, new SampledTotal(), rateMetricName, totalMetricName);
+        this(unit, new WindowedSum(), rateMetricName, totalMetricName);
     }
 
     /**
-     * Construct a Meter with seconds as time unit and provided {@link SampledStat} stats for Rate
+     * Construct a Meter with seconds as time unit
      */
     public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
         this(TimeUnit.SECONDS, rateStat, rateMetricName, totalMetricName);
     }
 
     /**
-     * Construct a Meter with provided time unit and provided {@link SampledStat} stats for Rate
+     * Construct a Meter with provided time unit
      */
     public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
-        if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof Count)) {
-            throw new IllegalArgumentException("Meter is supported only for SampledTotal and Count");
+        if (!(rateStat instanceof WindowedSum)) {
+            throw new IllegalArgumentException("Meter is supported only for WindowedCount or WindowedSum.");
         }
-        this.total = new Total();
+        this.total = new CumulativeSum();
         this.rate = new Rate(unit, rateStat);
         this.rateMetricName = rateMetricName;
         this.totalMetricName = totalMetricName;
@@ -81,7 +79,7 @@ public class Meter implements CompoundStat {
     public void record(MetricConfig config, double value, long timeMs) {
         rate.record(config, value, timeMs);
         // Total metrics with Count stat should record 1.0 (as recorded in the count)
-        double totalValue = (rate.stat instanceof Count) ? 1.0 : value;
+        double totalValue = (rate.stat instanceof WindowedCount) ? 1.0 : value;
         total.record(config, totalValue, timeMs);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index a56734c..604f860 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
@@ -40,7 +39,7 @@ public class Rate implements MeasurableStat {
     }
 
     public Rate(TimeUnit unit) {
-        this(unit, new SampledTotal());
+        this(unit, new WindowedSum());
     }
 
     public Rate(SampledStat stat) {
@@ -115,24 +114,10 @@ public class Rate implements MeasurableStat {
         }
     }
 
-    public static class SampledTotal extends SampledStat {
-
-        public SampledTotal() {
-            super(0.0d);
-        }
-
-        @Override
-        protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
-            sample.value += value;
-        }
-
-        @Override
-        public double combine(List<Sample> samples, MetricConfig config, long now) {
-            double total = 0.0;
-            for (Sample sample : samples)
-                total += sample.value;
-            return total;
-        }
-
+    /**
+     * @deprecated since 2.4 Use {@link WindowedSum} instead.
+     */
+    @Deprecated
+    public static class SampledTotal extends WindowedSum {
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
index b40e9cd..17188b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
@@ -16,30 +16,14 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import java.util.List;
-
-import org.apache.kafka.common.metrics.MetricConfig;
-
 /**
  * A {@link SampledStat} that maintains the sum of what it has seen.
+ * This is a sampled version of {@link CumulativeSum}.
+ *
+ * See also {@link WindowedCount} if you want to increment the value by 1 on each recording.
+ *
+ * @deprecated since 2.4 . Use {@link WindowedSum} instead
  */
-public class Sum extends SampledStat {
-
-    public Sum() {
-        super(0);
-    }
-
-    @Override
-    protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += value;
-    }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        for (Sample sample : samples)
-            total += sample.value;
-        return total;
-    }
-
+@Deprecated
+public class Sum extends WindowedSum {
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
index b8a83f5..23f7d04 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
@@ -16,32 +16,14 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.MetricConfig;
-
 /**
- * An un-windowed cumulative total maintained over all time.
+ * An non-sampled cumulative total maintained over all time.
+ * This is a non-sampled version of {@link WindowedSum}.
+ *
+ * See also {@link CumulativeCount} if you just want to increment the value by 1 on each recording.
+ *
+ * @deprecated since 2.4 . Use {@link CumulativeSum} instead.
  */
-public class Total implements MeasurableStat {
-
-    private double total;
-
-    public Total() {
-        this.total = 0.0;
-    }
-
-    public Total(double value) {
-        this.total = value;
-    }
-
-    @Override
-    public void record(MetricConfig config, double value, long now) {
-        this.total += value;
-    }
-
-    @Override
-    public double measure(MetricConfig config, long now) {
-        return this.total;
-    }
-
-}
+@Deprecated
+public class Total extends CumulativeSum {
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
similarity index 70%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
copy to clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
index 3da91c4..825f404 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
@@ -16,30 +16,20 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import java.util.List;
-
 import org.apache.kafka.common.metrics.MetricConfig;
 
 /**
  * A {@link SampledStat} that maintains a simple count of what it has seen.
+ * This is a special kind of {@link WindowedSum} that always records a value of {@code 1} instead of the provided value.
+ * In other words, it counts the number of
+ * {@link WindowedCount#record(MetricConfig, double, long)} invocations,
+ * instead of summing the recorded values.
+ *
+ * See also {@link CumulativeCount} for a non-sampled version of this metric.
  */
-public class Count extends SampledStat {
-
-    public Count() {
-        super(0);
-    }
-
+public class WindowedCount extends WindowedSum {
     @Override
     protected void update(Sample sample, MetricConfig config, double value, long now) {
-        sample.value += 1.0;
+        super.update(sample, config, 1.0, now);
     }
-
-    @Override
-    public double combine(List<Sample> samples, MetricConfig config, long now) {
-        double total = 0.0;
-        for (Sample sample : samples)
-            total += sample.value;
-        return total;
-    }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
similarity index 86%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
copy to clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
index b40e9cd..14aa562 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
@@ -16,16 +16,19 @@
  */
 package org.apache.kafka.common.metrics.stats;
 
-import java.util.List;
-
 import org.apache.kafka.common.metrics.MetricConfig;
 
+import java.util.List;
+
 /**
  * A {@link SampledStat} that maintains the sum of what it has seen.
+ * This is a sampled version of {@link CumulativeSum}.
+ *
+ * See also {@link WindowedCount} if you want to increment the value by 1 on each recording.
  */
-public class Sum extends SampledStat {
+public class WindowedSum extends SampledStat {
 
-    public Sum() {
+    public WindowedSum() {
         super(0);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 20e24b7..2652906 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -23,11 +23,11 @@ import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -1084,7 +1084,7 @@ public class Selector implements Selectable, AutoCloseable {
                     "successful-authentication-no-reauth-total", metricGrpName,
                     "The total number of connections with successful authentication where the client does not support re-authentication",
                     metricTags);
-            this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName, new Total());
+            this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName, new CumulativeSum());
 
             this.failedAuthentication = sensor("failed-authentication:" + tagsSuffix);
             this.failedAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
@@ -1105,13 +1105,13 @@ public class Selector implements Selectable, AutoCloseable {
             this.reauthenticationLatency.add(reauthenticationLatencyAvgMetricName, new Avg());
 
             this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix);
-            bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
+            bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new WindowedCount(),
                     "network-io", "network operations (reads or writes) on all connections"));
 
             this.bytesSent = sensor("bytes-sent:" + tagsSuffix, bytesTransferred);
             this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
                     "outgoing-byte", "outgoing bytes sent to all servers"));
-            this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
+            this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new WindowedCount(),
                     "request", "requests sent"));
             MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", metricTags);
             this.bytesSent.add(metricName, new Avg());
@@ -1122,11 +1122,11 @@ public class Selector implements Selectable, AutoCloseable {
             this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
                     "incoming-byte", "bytes read off all sockets"));
             this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
-                    new Count(), "response", "responses received"));
+                    new WindowedCount(), "response", "responses received"));
 
             this.selectTime = sensor("select-time:" + tagsSuffix);
             this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
-                    new Count(), "select", "times the I/O layer checked for new I/O to perform"));
+                    new WindowedCount(), "select", "times the I/O layer checked for new I/O to perform"));
             metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
             this.selectTime.add(metricName, new Avg());
             this.selectTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
@@ -1187,7 +1187,7 @@ public class Selector implements Selectable, AutoCloseable {
 
                     nodeRequest = sensor(nodeRequestName);
                     nodeRequest.add(createMeter(metrics, metricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-                    nodeRequest.add(createMeter(metrics, metricGrpName, tags, new Count(), "request", "requests sent"));
+                    nodeRequest.add(createMeter(metrics, metricGrpName, tags, new WindowedCount(), "request", "requests sent"));
                     MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", tags);
                     nodeRequest.add(metricName, new Avg());
                     metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", tags);
@@ -1196,7 +1196,7 @@ public class Selector implements Selectable, AutoCloseable {
                     String nodeResponseName = "node-" + connectionId + ".bytes-received";
                     Sensor nodeResponse = sensor(nodeResponseName);
                     nodeResponse.add(createMeter(metrics, metricGrpName, tags, "incoming-byte", "incoming bytes"));
-                    nodeResponse.add(createMeter(metrics, metricGrpName, tags, new Count(), "response", "responses received"));
+                    nodeResponse.add(createMeter(metrics, metricGrpName, tags, new WindowedCount(), "response", "responses received"));
 
                     String nodeTimeName = "node-" + connectionId + ".latency";
                     Sensor nodeRequestTime = sensor(nodeTimeName);
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index c6e112a..37d1182 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.junit.Test;
 
 import javax.management.MBeanServer;
@@ -43,7 +43,7 @@ public class JmxReporterTest {
 
             Sensor sensor = metrics.sensor("kafka.requests");
             sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
-            sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
+            sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new CumulativeSum());
 
             assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
             assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg"));
@@ -79,11 +79,11 @@ public class JmxReporterTest {
             metrics.addReporter(new JmxReporter());
 
             Sensor sensor = metrics.sensor("kafka.requests");
-            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new Total());
-            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new Total());
-            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new Total());
-            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new Total());
-            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo%"), new Total());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new CumulativeSum());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new CumulativeSum());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new CumulativeSum());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new CumulativeSum());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", "foo%"), new CumulativeSum());
 
             assertTrue(server.isRegistered(new ObjectName(":type=group,id=\"foo\\*\"")));
             assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=\"foo\\*\""), "name"));
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
index dd494c8..ea1178e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.common.metrics;
 
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,9 +51,9 @@ public class KafkaMbeanTest {
         metrics.addReporter(new JmxReporter());
         sensor = metrics.sensor("kafka.requests");
         countMetricName = metrics.metricName("pack.bean1.count", "grp1");
-        sensor.add(countMetricName, new Count());
+        sensor.add(countMetricName, new WindowedCount());
         sumMetricName = metrics.metricName("pack.bean1.sum", "grp1");
-        sensor.add(sumMetricName, new Sum());
+        sensor.add(sumMetricName, new WindowedSum());
     }
 
     @After
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 98b468a..0b1ea85 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -44,7 +44,7 @@ import java.util.function.Function;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Min;
@@ -52,9 +52,9 @@ import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.common.metrics.stats.SimpleRate;
-import org.apache.kafka.common.metrics.stats.Sum;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
@@ -119,15 +119,15 @@ public class MetricsTest {
         s.add(metrics.metricName("test.min", "grp1"), new Min());
         s.add(new Meter(TimeUnit.SECONDS, metrics.metricName("test.rate", "grp1"),
                 metrics.metricName("test.total", "grp1")));
-        s.add(new Meter(TimeUnit.SECONDS, new Count(), metrics.metricName("test.occurences", "grp1"),
+        s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(), metrics.metricName("test.occurences", "grp1"),
                 metrics.metricName("test.occurences.total", "grp1")));
-        s.add(metrics.metricName("test.count", "grp1"), new Count());
+        s.add(metrics.metricName("test.count", "grp1"), new WindowedCount());
         s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
                              new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
                              new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
 
         Sensor s2 = metrics.sensor("test.sensor2");
-        s2.add(metrics.metricName("s2.total", "grp1"), new Total());
+        s2.add(metrics.metricName("s2.total", "grp1"), new CumulativeSum());
         s2.record(5.0);
 
         int sum = 0;
@@ -162,15 +162,15 @@ public class MetricsTest {
     @Test
     public void testHierarchicalSensors() {
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new WindowedCount());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new WindowedCount());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new WindowedCount());
         Sensor child2 = metrics.sensor("test.child2", parent1);
-        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new WindowedCount());
         Sensor grandchild = metrics.sensor("test.grandchild", child1);
-        grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count());
+        grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new WindowedCount());
 
         /* increment each sensor one time */
         parent1.record();
@@ -222,15 +222,15 @@ public class MetricsTest {
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new WindowedCount());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new WindowedCount());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new WindowedCount());
         Sensor child2 = metrics.sensor("test.child2", parent2);
-        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new WindowedCount());
         Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
-        grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count());
+        grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new WindowedCount());
 
         Sensor sensor = metrics.getSensor("test.parent1");
         assertNotNull(sensor);
@@ -268,10 +268,10 @@ public class MetricsTest {
     @Test
     public void testRemoveInactiveMetrics() {
         Sensor s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new WindowedCount());
 
         Sensor s2 = metrics.sensor("test.s2", null, 3);
-        s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
+        s2.add(metrics.metricName("test.s2.count", "grp1"), new WindowedCount());
 
         Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
         purger.run();
@@ -309,7 +309,7 @@ public class MetricsTest {
 
         // After purging, it should be possible to recreate a metric
         s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new WindowedCount());
         assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
         assertNotNull("MetricName test.s1.count must be present",
                 metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
@@ -318,8 +318,8 @@ public class MetricsTest {
     @Test
     public void testRemoveMetric() {
         int size = metrics.metrics().size();
-        metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
-        metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
+        metrics.addMetric(metrics.metricName("test1", "grp1"), new WindowedCount());
+        metrics.addMetric(metrics.metricName("test2", "grp1"), new WindowedCount());
 
         assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1")));
         assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
@@ -333,7 +333,7 @@ public class MetricsTest {
 
     @Test
     public void testEventWindowing() {
-        Count count = new Count();
+        WindowedCount count = new WindowedCount();
         MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
         count.record(config, 1.0, time.milliseconds());
         count.record(config, 1.0, time.milliseconds());
@@ -344,7 +344,7 @@ public class MetricsTest {
 
     @Test
     public void testTimeWindowing() {
-        Count count = new Count();
+        WindowedCount count = new WindowedCount();
         MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
         count.record(config, 1.0, time.milliseconds());
         time.sleep(1);
@@ -397,8 +397,8 @@ public class MetricsTest {
      */
     @Test
     public void testSampledStatReturnsInitialValueWhenNoValuesExist() {
-        Count count = new Count();
-        Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
+        WindowedCount count = new WindowedCount();
+        WindowedSum sampledTotal = new WindowedSum();
         long windowMs = 100;
         int samples = 2;
         MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
@@ -415,14 +415,14 @@ public class MetricsTest {
     @Test(expected = IllegalArgumentException.class)
     public void testDuplicateMetricName() {
         metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());
-        metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
+        metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new CumulativeSum());
     }
 
     @Test
     public void testQuotas() {
         Sensor sensor = metrics.sensor("test");
-        sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
-        sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
+        sensor.add(metrics.metricName("test1.total", "grp1"), new CumulativeSum(), new MetricConfig().quota(Quota.upperBound(5.0)));
+        sensor.add(metrics.metricName("test2.total", "grp1"), new CumulativeSum(), new MetricConfig().quota(Quota.lowerBound(0.0)));
         sensor.record(5.0);
         try {
             sensor.record(1.0);
@@ -503,7 +503,7 @@ public class MetricsTest {
         MetricName countRateMetricName = metrics.metricName("test.count.rate", "grp1");
         MetricName countTotalMetricName = metrics.metricName("test.count.total", "grp1");
         s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
-        s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName, countTotalMetricName));
+        s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(), countRateMetricName, countTotalMetricName));
         KafkaMetric totalMetric = metrics.metrics().get(totalMetricName);
         KafkaMetric countTotalMetric = metrics.metrics().get(countTotalMetricName);
 
@@ -825,10 +825,10 @@ public class MetricsTest {
                     sensor.add(metrics.metricName("test.metric.avg", "avg", tags), new Avg());
                     break;
                 case TOTAL:
-                    sensor.add(metrics.metricName("test.metric.total", "total", tags), new Total());
+                    sensor.add(metrics.metricName("test.metric.total", "total", tags), new CumulativeSum());
                     break;
                 case COUNT:
-                    sensor.add(metrics.metricName("test.metric.count", "count", tags), new Count());
+                    sensor.add(metrics.metricName("test.metric.count", "count", tags), new WindowedCount());
                     break;
                 case MAX:
                     sensor.add(metrics.metricName("test.metric.max", "max", tags), new Max());
@@ -843,7 +843,7 @@ public class MetricsTest {
                     sensor.add(metrics.metricName("test.metric.simpleRate", "simpleRate", tags), new SimpleRate());
                     break;
                 case SUM:
-                    sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new Sum());
+                    sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new WindowedSum());
                     break;
                 case VALUE:
                     sensor.add(metrics.metricName("test.metric.value", "value", tags), new Value());
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 8e3dfeb..fc7cfe2 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -128,7 +128,7 @@ public class SensorTest {
         }
 
         // note that adding a different metric with the same name is also a no-op
-        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new WindowedSum()));
 
         // so after all this, we still just have the original metric registered
         assertEquals(1, sensor.metrics().size());
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
index 8204771..27198ea 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
@@ -44,7 +44,7 @@ public class MeterTest {
         assertEquals(rateMetricName, rate.name());
         assertEquals(totalMetricName, total.name());
         Rate rateStat = (Rate) rate.stat();
-        Total totalStat = (Total) total.stat();
+        CumulativeSum totalStat = (CumulativeSum) total.stat();
 
         MetricConfig config = new MetricConfig();
         double nextValue = 0.0;
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 379db2f..93cbf6d 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -21,11 +21,11 @@ import java.util.Arrays;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 
 public class MetricsBench {
 
@@ -37,7 +37,7 @@ public class MetricsBench {
             Sensor child = metrics.sensor("child", parent);
             for (Sensor sensor : Arrays.asList(parent, child)) {
                 sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"), new Avg());
-                sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new Count());
+                sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new WindowedCount());
                 sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"), new Max());
                 sensor.add(new Percentiles(1024,
                         0.0,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f848a18..fd90dd1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -24,8 +24,8 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Frequencies;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -863,13 +863,13 @@ public class Worker {
             connectorStartupResults.add(connectorStartupResultFrequencies);
 
             connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
-            connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total());
+            connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
 
             connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
-            connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total());
+            connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
 
             connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
-            connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new Total());
+            connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
 
             MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
             MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
@@ -878,13 +878,13 @@ public class Worker {
             taskStartupResults.add(taskStartupResultFrequencies);
 
             taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
-            taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total());
+            taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
 
             taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
-            taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new Total());
+            taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
 
             taskStartupFailures = metricGroup.sensor("task-startup-failures");
-            taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new Total());
+            taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
         }
 
         void close() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index f21c500..395c93e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -705,11 +705,11 @@ class WorkerSinkTask extends WorkerTask {
 
             sinkRecordRead = metricGroup.sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
-            sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total());
+            sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new CumulativeSum());
 
             sinkRecordSend = metricGroup.sensor("sink-record-send");
             sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
-            sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total());
+            sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new CumulativeSum());
 
             sinkRecordActiveCount = metricGroup.sensor("sink-record-active-count");
             sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
@@ -724,11 +724,11 @@ class WorkerSinkTask extends WorkerTask {
 
             offsetCompletion = metricGroup.sensor("offset-commit-completion");
             offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
-            offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total());
+            offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new CumulativeSum());
 
             offsetCompletionSkip = metricGroup.sensor("offset-commit-completion-skip");
             offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
-            offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total());
+            offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new CumulativeSum());
 
             putBatchTime = metricGroup.sensor("put-batch-time");
             putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6e94c6f..6e1152b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -591,11 +591,11 @@ class WorkerSourceTask extends WorkerTask {
 
             sourceRecordPoll = metricGroup.sensor("source-record-poll");
             sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
-            sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new Total());
+            sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
 
             sourceRecordWrite = metricGroup.sensor("source-record-write");
             sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
-            sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new Total());
+            sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
 
             pollTime = metricGroup.sensor("poll-batch-time");
             pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b1a41c0..86caeeb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -1483,7 +1483,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             });
 
             rebalanceCompletedCounts = metricGroup.sensor("completed-rebalance-count");
-            rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new Total());
+            rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new CumulativeSum());
 
             rebalanceTime = metricGroup.sensor("rebalance-time");
             rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
index c589012..0deecd1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.ConnectMetrics;
@@ -62,25 +62,25 @@ public class ErrorHandlingMetrics {
         metricGroup.close();
 
         recordProcessingFailures = metricGroup.sensor("total-record-failures");
-        recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures), new Total());
+        recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures), new CumulativeSum());
 
         recordProcessingErrors = metricGroup.sensor("total-record-errors");
-        recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors), new Total());
+        recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors), new CumulativeSum());
 
         recordsSkipped = metricGroup.sensor("total-records-skipped");
-        recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), new Total());
+        recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), new CumulativeSum());
 
         retries = metricGroup.sensor("total-retries");
-        retries.add(metricGroup.metricName(registry.retries), new Total());
+        retries.add(metricGroup.metricName(registry.retries), new CumulativeSum());
 
         errorsLogged = metricGroup.sensor("total-errors-logged");
-        errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new Total());
+        errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new CumulativeSum());
 
         dlqProduceRequests = metricGroup.sensor("deadletterqueue-produce-requests");
-        dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new Total());
+        dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new CumulativeSum());
 
         dlqProduceFailures = metricGroup.sensor("deadletterqueue-produce-failures");
-        dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new Total());
+        dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new CumulativeSum());
 
         metricGroup.addValueMetric(registry.lastErrorTimestamp, now -> lastErrorTime);
     }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 619d260..69f02ae 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -39,8 +39,7 @@ import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.{KafkaException, Reconfigurable}
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.Meter
-import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.stats.{CumulativeSum, Meter}
 import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.protocol.ApiKeys
@@ -712,7 +711,7 @@ private[kafka] class Processor(val id: Int,
     Map(NetworkProcessorMetricTag -> id.toString)
   )
 
-  val expiredConnectionsKilledCount = new Total()
+  val expiredConnectionsKilledCount = new CumulativeSum()
   private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
   metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
 
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 959144c..5451b5c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -27,7 +27,7 @@ import kafka.utils.{Logging, ShutdownableThread}
 import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
@@ -179,7 +179,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
     quotaType.toString,
-    "Tracks the size of the delay queue"), new Total())
+    "Tracks the size of the delay queue"), new CumulativeSum())
   start() // Use start method to keep spotbugs happy
   private def start() {
     throttledChannelReaper.start()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 4ecaeb4..038b8ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals.metrics;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Sum;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
@@ -106,7 +106,7 @@ public class Sensors {
                 "The average number of occurrence of suppression-emit operation per second.",
                 tags
             ),
-            new Rate(TimeUnit.SECONDS, new Sum())
+            new Rate(TimeUnit.SECONDS, new WindowedSum())
         );
         sensor.add(
             new MetricName(
@@ -115,7 +115,7 @@ public class Sensors {
                 "The total number of occurrence of suppression-emit operations.",
                 tags
             ),
-            new Total()
+            new CumulativeSum()
         );
         return sensor;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 210412b..836330f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -28,9 +28,10 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -43,7 +44,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -112,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             );
             taskCommitTimeSensor.add(
                 new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", tagMap),
-                new Rate(TimeUnit.SECONDS, new Count())
+                new Rate(TimeUnit.SECONDS, new WindowedCount())
             );
             taskCommitTimeSensor.add(
                 new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap),
@@ -123,7 +123,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-processing", Sensor.RecordingLevel.DEBUG, parent);
             taskEnforcedProcessSensor.add(
                     new MetricName("enforced-processing-rate", group, "The average number of occurrence of enforced-processing operation per second.", tagMap),
-                    new Rate(TimeUnit.SECONDS, new Count())
+                    new Rate(TimeUnit.SECONDS, new WindowedCount())
             );
             taskEnforcedProcessSensor.add(
                     new MetricName("enforced-processing-total", group, "The total number of occurrence of enforced-processing operations.", tagMap),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 46b5669..b6bfcc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -22,9 +22,10 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.util.Arrays;
@@ -445,7 +446,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                 descriptionOfRate,
                 tags
             ),
-            new Rate(TimeUnit.SECONDS, new Count())
+            new Rate(TimeUnit.SECONDS, new WindowedCount())
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index a7da2cb..47dd61b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -215,7 +215,7 @@ public class RecordCollectorTest {
         final Sensor sensor = metrics.sensor("skipped-records");
         final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
         final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
-        sensor.add(metricName, new Sum());
+        sensor.add(metricName, new WindowedSum());
         final RecordCollector collector = new RecordCollectorImpl(
             "test",
             logContext,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 0400128..2faa078 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -692,7 +692,7 @@ public class StandbyTaskTest {
     private MetricName setupCloseTaskMetric() {
         final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
         final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
-        sensor.add(metricName, new Total());
+        sensor.add(metricName, new CumulativeSum());
         return metricName;
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 48c038c..2b1428f 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -32,9 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -289,12 +289,12 @@ public class TopologyTestDriver implements Closeable {
                                                 threadLevelGroup,
                                                 "The average per-second number of skipped records",
                                                 streamsMetrics.tagMap()),
-                                 new Rate(TimeUnit.SECONDS, new Count()));
+                                 new Rate(TimeUnit.SECONDS, new WindowedCount()));
         skippedRecordsSensor.add(new MetricName("skipped-records-total",
                                                 threadLevelGroup,
                                                 "The total number of skipped records",
                                                 streamsMetrics.tagMap()),
-                                 new Total());
+                                 new CumulativeSum());
         final ThreadCache cache = new ThreadCache(
             new LogContext("topology-test-driver "),
             Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),


Mime
View raw message