kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10165: Remove Percentiles from e2e metrics (#8882)
Date Wed, 17 Jun 2020 14:37:14 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new f5cbbe7  KAFKA-10165: Remove Percentiles from e2e metrics (#8882)
f5cbbe7 is described below

commit f5cbbe7921fff15c23a36ea8cc8cbe9fe86045e0
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Jun 17 09:24:07 2020 -0500

    KAFKA-10165: Remove Percentiles from e2e metrics (#8882)
    
    * Remove problematic Percentiles measurements until the implementation is fixed
    * Fix leaking e2e metrics when task is closed
    * Fix leaking metrics when tasks are recycled
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  |   2 +-
 .../streams/processor/internals/StandbyTask.java   |  11 +-
 .../streams/processor/internals/StreamTask.java    |  24 ++--
 .../streams/processor/internals/TaskManager.java   |   6 -
 .../internals/metrics/ProcessorNodeMetrics.java    |  34 +-----
 .../internals/metrics/StreamsMetricsImpl.java      |  43 ++-----
 .../processor/internals/metrics/TaskMetrics.java   |  28 +++++
 .../integration/MetricsIntegrationTest.java        |   4 -
 .../processor/internals/StandbyTaskTest.java       |  42 +++++++
 .../processor/internals/StreamTaskTest.java        | 133 +++++++++++----------
 .../metrics/ProcessorNodeMetricsTest.java          |  46 ++-----
 .../internals/metrics/StreamsMetricsImplTest.java  |   8 +-
 12 files changed, 179 insertions(+), 202 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a58a862..b220fa5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -235,7 +235,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 5df59f6..b334bc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -44,6 +44,7 @@ public class StandbyTask extends AbstractTask implements Task {
     private final Sensor closeTaskSensor;
     private final boolean eosEnabled;
     private final InternalProcessorContext processorContext;
+    private final StreamsMetricsImpl streamsMetrics;
 
     private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
 
@@ -52,7 +53,7 @@ public class StandbyTask extends AbstractTask implements Task {
      * @param partitions     input topic partitions, used for thread metadata only
      * @param topology       the instance of {@link ProcessorTopology}
      * @param config         the {@link StreamsConfig} specified by the user
-     * @param metrics        the {@link StreamsMetrics} created by the thread
+     * @param streamsMetrics        the {@link StreamsMetrics} created by the thread
      * @param stateMgr       the {@link ProcessorStateManager} for this task
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
@@ -60,13 +61,14 @@ public class StandbyTask extends AbstractTask implements Task {
                 final Set<TopicPartition> partitions,
                 final ProcessorTopology topology,
                 final StreamsConfig config,
-                final StreamsMetricsImpl metrics,
+                final StreamsMetricsImpl streamsMetrics,
                 final ProcessorStateManager stateMgr,
                 final StateDirectory stateDirectory,
                 final ThreadCache cache,
                 final InternalProcessorContext processorContext) {
         super(id, topology, stateDirectory, stateMgr, partitions);
         this.processorContext = processorContext;
+        this.streamsMetrics = streamsMetrics;
         processorContext.transitionToStandby(cache);
 
         final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
@@ -74,7 +76,7 @@ public class StandbyTask extends AbstractTask implements Task {
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(),
metrics);
+        closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(),
streamsMetrics);
         eosEnabled = StreamThread.eosEnabled(config);
     }
 
@@ -174,18 +176,21 @@ public class StandbyTask extends AbstractTask implements Task {
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         close(true);
         log.info("Closed clean");
     }
 
     @Override
     public void closeDirty() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         close(false);
         log.info("Closed dirty");
     }
 
     @Override
     public void closeAndRecycleState() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         if (state() == State.SUSPENDED) {
             stateMgr.recycle();
         } else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index fa8b94b..eb1bf4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@@ -89,6 +88,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     private final Map<TopicPartition, Long> consumedOffsets;
     private final PunctuationQueue streamTimePunctuationQueue;
     private final PunctuationQueue systemTimePunctuationQueue;
+    private final StreamsMetricsImpl streamsMetrics;
 
     private long processTimeMs = 0L;
 
@@ -135,6 +135,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         eosEnabled = StreamThread.eosEnabled(config);
 
         final String threadId = Thread.currentThread().getName();
+        this.streamsMetrics = streamsMetrics;
         closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
         final String taskId = id.toString();
         if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
@@ -148,18 +149,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
         bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId,
streamsMetrics);
 
-        for (final String terminalNode : topology.terminalNodes()) {
+        for (final String terminalNodeName : topology.terminalNodes()) {
             e2eLatencySensors.put(
-                terminalNode,
-                ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode,
RecordingLevel.INFO, streamsMetrics)
+                terminalNodeName,
+                TaskMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, RecordingLevel.INFO,
streamsMetrics)
             );
         }
 
         for (final ProcessorNode<?, ?> sourceNode : topology.sources()) {
-            final String processorId = sourceNode.name();
+            final String sourceNodeName = sourceNode.name();
             e2eLatencySensors.put(
-                processorId,
-                ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId,
RecordingLevel.INFO, streamsMetrics)
+                sourceNodeName,
+                TaskMetrics.e2ELatencySensor(threadId, taskId, sourceNodeName, RecordingLevel.INFO,
streamsMetrics)
             );
         }
 
@@ -462,12 +463,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     @Override
     public void closeClean() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         close(true);
         log.info("Closed clean");
     }
 
     @Override
     public void closeDirty() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         close(false);
         log.info("Closed dirty");
     }
@@ -480,6 +483,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
     @Override
     public void closeAndRecycleState() {
+        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
         switch (state()) {
             case SUSPENDED:
                 stateMgr.recycle();
@@ -917,11 +921,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         return punctuated;
     }
 
-    void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
-        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
-    }
-
-    private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final
String nodeName) {
+    void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName)
{
         final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
         if (e2eLatencySensor == null) {
             throw new IllegalStateException("Requested to record e2e latency but could not
find sensor for node " + nodeName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 92885fd..b90ed5f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -671,15 +671,9 @@ public class TaskManager {
 
     // Note: this MUST be called *before* actually closing the task
     private void cleanupTask(final Task task) {
-        // 1. remove the input partitions from the materialized map;
-        // 2. remove the task metrics from the metrics registry
-
         for (final TopicPartition inputPartition : task.inputPartitions()) {
             partitionToTask.remove(inputPartition);
         }
-
-        final String threadId = Thread.currentThread().getName();
-        streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString());
     }
 
     void shutdown(final boolean clean) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
index f35d5dd..2bd5ba6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
@@ -29,7 +29,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor;
 
 public class ProcessorNodeMetrics {
     private ProcessorNodeMetrics() {}
@@ -99,15 +98,6 @@ public class ProcessorNodeMetrics {
     private static final String LATE_RECORD_DROP_RATE_DESCRIPTION =
         RATE_DESCRIPTION_PREFIX + LATE_RECORD_DROP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
 
-    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
-    private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
-        "end-to-end latency of a record, measuring by comparing the record timestamp with
the "
-            + "system time when it has been fully processed by the node";
-    private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
-    private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
-    private static final String RECORD_E2E_LATENCY_P99_DESCRIPTION = "The 99th percentile
" + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
-    private static final String RECORD_E2E_LATENCY_P90_DESCRIPTION = "The 90th percentile
" + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
-
     public static Sensor suppressionEmitSensor(final String threadId,
                                                final String taskId,
                                                final String processorNodeId,
@@ -299,26 +289,6 @@ public class ProcessorNodeMetrics {
         return processAtSourceSensor(threadId, taskId, processorNodeId, streamsMetrics);
     }
 
-    public static Sensor recordE2ELatencySensor(final String threadId,
-                                                final String taskId,
-                                                final String processorNodeId,
-                                                final RecordingLevel recordingLevel,
-                                                final StreamsMetricsImpl streamsMetrics)
{
-        final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId,
RECORD_E2E_LATENCY, recordingLevel);
-        final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId,
taskId, processorNodeId);
-        addMinAndMaxAndP99AndP90ToSensor(
-            sensor,
-            PROCESSOR_NODE_LEVEL_GROUP,
-            tagMap,
-            RECORD_E2E_LATENCY,
-            RECORD_E2E_LATENCY_MIN_DESCRIPTION,
-            RECORD_E2E_LATENCY_MAX_DESCRIPTION,
-            RECORD_E2E_LATENCY_P99_DESCRIPTION,
-            RECORD_E2E_LATENCY_P90_DESCRIPTION
-        );
-        return sensor;
-    }
-
     private static Sensor throughputAndLatencySensorWithParent(final String threadId,
                                                                final String taskId,
                                                                final String processorNodeId,
@@ -337,7 +307,7 @@ public class ProcessorNodeMetrics {
             descriptionOfCount,
             descriptionOfAvgLatency,
             descriptionOfMaxLatency,
-            RecordingLevel.DEBUG,
+            recordingLevel,
             streamsMetrics
         );
         return throughputAndLatencySensor(
@@ -349,7 +319,7 @@ public class ProcessorNodeMetrics {
             descriptionOfCount,
             descriptionOfAvgLatency,
             descriptionOfMaxLatency,
-            RecordingLevel.DEBUG,
+            recordingLevel,
             streamsMetrics,
             parentSensor
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index fe0c94c..215dfc5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -28,9 +28,6 @@ import org.apache.kafka.common.metrics.stats.CumulativeCount;
 import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Min;
-import org.apache.kafka.common.metrics.stats.Percentile;
-import org.apache.kafka.common.metrics.stats.Percentiles;
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
@@ -47,9 +44,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import java.util.Optional;
 
 public class StreamsMetricsImpl implements StreamsMetrics {
 
@@ -154,9 +151,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
-    private static final int PERCENTILES_SIZE_IN_BYTES = 100 * 1000;    // 100 kB
-    private static double MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000d; // maximum latency
is 10 days; values above that will be pinned
-
     public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String
builtInMetricsVersion) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be
null");
@@ -650,14 +644,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         );
     }
 
-    public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
-                                                        final String group,
-                                                        final Map<String, String> tags,
-                                                        final String operation,
-                                                        final String descriptionOfMin,
-                                                        final String descriptionOfMax,
-                                                        final String descriptionOfP99,
-                                                        final String descriptionOfP90) {
+    public static void addMinAndMaxToSensor(final Sensor sensor,
+                                            final String group,
+                                            final Map<String, String> tags,
+                                            final String operation,
+                                            final String descriptionOfMin,
+                                            final String descriptionOfMax) {
         sensor.add(
             new MetricName(
                 operation + MIN_SUFFIX,
@@ -675,27 +667,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                 tags),
             new Max()
         );
-
-        sensor.add(
-            new Percentiles(
-                PERCENTILES_SIZE_IN_BYTES,
-                MAXIMUM_E2E_LATENCY,
-                BucketSizing.LINEAR,
-                new Percentile(
-                    new MetricName(
-                        operation + P99_SUFFIX,
-                        group,
-                        descriptionOfP99,
-                        tags),
-                    99),
-                new Percentile(
-                    new MetricName(
-                        operation + P90_SUFFIX,
-                        group,
-                        descriptionOfP90,
-                        tags),
-                    90))
-        );
     }
 
     public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
index 8fe2e3a..534dc02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
@@ -24,11 +24,13 @@ import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 import java.util.Map;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxToSensor;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;
 
 public class TaskMetrics {
@@ -86,6 +88,13 @@ public class TaskMetrics {
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered
records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
+        "end-to-end latency of a record, measuring by comparing the record timestamp with
the "
+            + "system time when it has been fully processed by the node";
+    private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+    private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
+
     public static Sensor processLatencySensor(final String threadId,
                                               final String taskId,
                                               final StreamsMetricsImpl streamsMetrics) {
@@ -133,6 +142,25 @@ public class TaskMetrics {
         return sensor;
     }
 
+    public static Sensor e2ELatencySensor(final String threadId,
+                                          final String taskId,
+                                          final String processorNodeId,
+                                          final RecordingLevel recordingLevel,
+                                          final StreamsMetricsImpl streamsMetrics) {
+        final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName,
recordingLevel);
+        final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId,
taskId, processorNodeId);
+        addMinAndMaxToSensor(
+            sensor,
+            PROCESSOR_NODE_LEVEL_GROUP,
+            tagMap,
+            RECORD_E2E_LATENCY,
+            RECORD_E2E_LATENCY_MIN_DESCRIPTION,
+            RECORD_E2E_LATENCY_MAX_DESCRIPTION
+        );
+        return sensor;
+    }
+
     public static Sensor punctuateSensor(final String threadId,
                                          final String taskId,
                                          final StreamsMetricsImpl streamsMetrics) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index f06057f..b9f8f9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -206,8 +206,6 @@ public class MetricsIntegrationTest {
     private static final String EXPIRED_WINDOW_RECORD_DROP_TOTAL = "expired-window-record-drop-total";
     private static final String E2E_LATENCY_MIN = "record-e2e-latency-min";
     private static final String E2E_LATENCY_MAX = "record-e2e-latency-max";
-    private static final String E2E_LATENCY_P99 = "record-e2e-latency-p99";
-    private static final String E2E_LATENCY_P90 = "record-e2e-latency-p90";
 
     // stores name
     private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
@@ -608,8 +606,6 @@ public class MetricsIntegrationTest {
         checkMetricByName(listMetricProcessor, FORWARD_RATE, numberOfModifiedForwardMetrics);
         checkMetricByName(listMetricProcessor, E2E_LATENCY_MIN, numberOfSourceNodes + numberOfTerminalNodes);
         checkMetricByName(listMetricProcessor, E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes);
-        checkMetricByName(listMetricProcessor, E2E_LATENCY_P99, numberOfSourceNodes + numberOfTerminalNodes);
-        checkMetricByName(listMetricProcessor, E2E_LATENCY_P90, numberOfSourceNodes + numberOfTerminalNodes);
     }
 
     private void checkKeyValueStoreMetrics(final String group0100To24,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 3f4b410..82f33c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -52,6 +52,8 @@ import org.junit.runner.RunWith;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -62,6 +64,7 @@ import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;
 import static org.apache.kafka.streams.processor.internals.Task.State.SUSPENDED;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -381,6 +384,37 @@ public class StandbyTaskTest {
     }
 
     @Test
+    public void shouldUnregisterMetricsInCloseClean() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+        task.initializeIfNeeded();
+
+        task.suspend();
+        task.closeClean();
+        // Currently, there are no metrics registered for standby tasks.
+        // This is a regression test so that, if we add some, we will be sure to deregister
them.
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
+    public void shouldUnregisterMetricsInCloseDirty() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.replay(stateManager);
+
+        task = createStandbyTask();
+        task.initializeIfNeeded();
+
+        task.suspend();
+        task.closeDirty();
+
+        // Currently, there are no metrics registered for standby tasks.
+        // This is a regression test so that, if we add some, we will be sure to deregister
them.
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
     public void shouldCloseStateManagerOnTaskCreated() {
         stateManager.close();
         EasyMock.expectLastCall();
@@ -476,6 +510,10 @@ public class StandbyTaskTest {
         task.suspend();
         task.closeAndRecycleState(); // SUSPENDED
 
+        // Currently, there are no metrics registered for standby tasks.
+        // This is a regression test so that, if we add some, we will be sure to deregister
them.
+        assertThat(getTaskMetrics(), empty());
+
         EasyMock.verify(stateManager);
     }
 
@@ -538,4 +576,8 @@ public class StandbyTaskTest {
         final double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
         assertThat(totalCloses, equalTo(expected));
     }
+
+    private List<MetricName> getTaskMetrics() {
+        return streamsMetrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7a2cf7a..59e96d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.HashSet;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -74,6 +73,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -97,7 +97,9 @@ import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -404,7 +406,7 @@ public class StreamTaskTest {
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString(),
StreamsConfig.METRICS_LATEST);
 
-        assertThat(metric.metricValue(), equalTo(0.0d));
+        assertThat(metric.metricValue(), equalTo(0.0));
 
         task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 10),
@@ -412,12 +414,12 @@ public class StreamTaskTest {
         ));
         task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds());
 
-        assertThat(metric.metricValue(), equalTo(2.0d));
+        assertThat(metric.metricValue(), equalTo(2.0));
 
         task.process(0L);
         task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds());
 
-        assertThat(metric.metricValue(), equalTo(1.0d));
+        assertThat(metric.metricValue(), equalTo(1.0));
     }
 
     @Test
@@ -426,22 +428,22 @@ public class StreamTaskTest {
 
         final KafkaMetric metric = getMetric("active-process", "%s-ratio", task.id().toString(),
StreamsConfig.METRICS_LATEST);
 
-        assertThat(metric.metricValue(), equalTo(0.0d));
+        assertThat(metric.metricValue(), equalTo(0.0));
 
         task.recordProcessBatchTime(10L);
         task.recordProcessBatchTime(15L);
         task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds());
 
-        assertThat(metric.metricValue(), equalTo(0.25d));
+        assertThat(metric.metricValue(), equalTo(0.25));
 
         task.recordProcessBatchTime(10L);
 
-        assertThat(metric.metricValue(), equalTo(0.25d));
+        assertThat(metric.metricValue(), equalTo(0.25));
 
         task.recordProcessBatchTime(10L);
         task.recordProcessTimeRatioAndBufferSize(20L, time.milliseconds());
 
-        assertThat(metric.metricValue(), equalTo(1.0d));
+        assertThat(metric.metricValue(), equalTo(1.0));
     }
 
     @Test
@@ -458,24 +460,7 @@ public class StreamTaskTest {
         task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
         task.process(100L);
 
-        assertThat(maxMetric.metricValue(), equalTo(100d));
-    }
-
-    @Test
-    public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
-        time = new MockTime(0L, 0L, 0L);
-        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO),
time);
-        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
-
-        final String terminalNode = processorStreamTime.name();
-
-        final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(),
terminalNode, StreamsConfig.METRICS_LATEST);
-
-        // e2e latency = 100
-        time.setCurrentTimeMs(100L);
-        task.maybeRecordE2ELatency(0L, terminalNode);
-
-        assertThat(maxMetric.metricValue(), equalTo(100d));
+        assertThat(maxMetric.metricValue(), equalTo(100.0));
     }
 
     @Test
@@ -493,50 +478,24 @@ public class StreamTaskTest {
         assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
 
         // e2e latency = 10
-        time.setCurrentTimeMs(10L);
-        task.maybeRecordE2ELatency(0L, sourceNode);
-        assertThat(minMetric.metricValue(), equalTo(10d));
-        assertThat(maxMetric.metricValue(), equalTo(10d));
+        task.maybeRecordE2ELatency(0L, 10L, sourceNode);
+        assertThat(minMetric.metricValue(), equalTo(10.0));
+        assertThat(maxMetric.metricValue(), equalTo(10.0));
 
         // e2e latency = 15
-        time.setCurrentTimeMs(25L);
-        task.maybeRecordE2ELatency(10L, sourceNode);
-        assertThat(minMetric.metricValue(), equalTo(10d));
-        assertThat(maxMetric.metricValue(), equalTo(15d));
+        task.maybeRecordE2ELatency(10L, 25L, sourceNode);
+        assertThat(minMetric.metricValue(), equalTo(10.0));
+        assertThat(maxMetric.metricValue(), equalTo(15.0));
 
         // e2e latency = 25
-        time.setCurrentTimeMs(30L);
-        task.maybeRecordE2ELatency(5L, sourceNode);
-        assertThat(minMetric.metricValue(), equalTo(10d));
-        assertThat(maxMetric.metricValue(), equalTo(25d));
+        task.maybeRecordE2ELatency(5L, 30L, sourceNode);
+        assertThat(minMetric.metricValue(), equalTo(10.0));
+        assertThat(maxMetric.metricValue(), equalTo(25.0));
 
         // e2e latency = 20
-        time.setCurrentTimeMs(40L);
-        task.maybeRecordE2ELatency(35L, sourceNode);
-        assertThat(minMetric.metricValue(), equalTo(5d));
-        assertThat(maxMetric.metricValue(), equalTo(25d));
-    }
-
-    @Test
-    public void shouldRecordE2ELatencyPercentiles() {
-        time = new MockTime(0L, 0L, 0L);
-        metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO),
time);
-        task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
-
-        final String sourceNode = source1.name();
-
-        final Metric p99Metric = getProcessorMetric("record-e2e-latency", "%s-p99", task.id().toString(),
sourceNode, StreamsConfig.METRICS_LATEST);
-        final Metric p90Metric = getProcessorMetric("record-e2e-latency", "%s-p90", task.id().toString(),
sourceNode, StreamsConfig.METRICS_LATEST);
-
-        for (int i = 0; i < 100; i++) {
-            time.setCurrentTimeMs(i);
-            task.maybeRecordE2ELatency(0L, sourceNode);
-        }
-
-        final double expectedAccuracy = 0.25d; // Make sure it's accurate to within 25% of
the expected value
-
-        assertEquals((double) p99Metric.metricValue(), 99d, 99 * expectedAccuracy);
-        assertEquals((double) p90Metric.metricValue(), 90d, 90 * expectedAccuracy);
+        task.maybeRecordE2ELatency(35L, 40L, sourceNode);
+        assertThat(minMetric.metricValue(), equalTo(5.0));
+        assertThat(maxMetric.metricValue(), equalTo(25.0));
     }
 
     @Test
@@ -1765,6 +1724,48 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldUnregisterMetricsInCloseClean() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
+
+        task.suspend();
+        assertThat(getTaskMetrics(), not(empty()));
+        task.closeClean();
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
+    public void shouldUnregisterMetricsInCloseDirty() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
+
+        task.suspend();
+        assertThat(getTaskMetrics(), not(empty()));
+        task.closeDirty();
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
+    public void shouldUnregisterMetricsInCloseAndRecycle() {
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
+
+        task.suspend();
+        assertThat(getTaskMetrics(), not(empty()));
+        task.closeAndRecycleState();
+        assertThat(getTaskMetrics(), empty());
+    }
+
+    @Test
     public void closeShouldBeIdempotent() {
         EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
         EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
@@ -1848,6 +1849,10 @@ public class StreamTaskTest {
         assertThat(task.state(), equalTo(SUSPENDED));
     }
 
+    private List<MetricName> getTaskMetrics() {
+        return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
+    }
+
     private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Consumer<byte[],
byte[]> consumer) {
         final StateStore stateStore = new MockKeyValueStore(storeName, true);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
index 14f370d..08131e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
@@ -87,7 +87,7 @@ public class ProcessorNodeMetricsTest {
         expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
         StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             expectedSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             tagMap,
             metricNamePrefix,
             descriptionOfRate,
@@ -108,7 +108,7 @@ public class ProcessorNodeMetricsTest {
         expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
         StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             expectedSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             tagMap,
             metricNamePrefix,
             descriptionOfRate,
@@ -261,38 +261,6 @@ public class ProcessorNodeMetricsTest {
         }
     }
 
-    @Test
-    public void shouldGetRecordE2ELatencySensor() {
-        final String operation = "record-e2e-latency";
-        final String recordE2ELatencyMinDescription =
-            "The minimum end-to-end latency of a record, measuring by comparing the record
timestamp with the "
-                + "system time when it has been fully processed by the node";
-        final String recordE2ELatencyMaxDescription =
-            "The maximum end-to-end latency of a record, measuring by comparing the record
timestamp with the "
-                + "system time when it has been fully processed by the node";
-        final String recordE2ELatencyP99Description =
-            "The 99th percentile end-to-end latency of a record, measuring by comparing the
record timestamp with the "
-                + "system time when it has been fully processed by the node";
-        final String recordE2ELatencyP90Description =
-            "The 90th percentile end-to-end latency of a record, measuring by comparing the
record timestamp with the "
-                + "system time when it has been fully processed by the node";
-        expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation,
RecordingLevel.INFO))
-            .andReturn(expectedSensor);
-        expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
-        StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor(
-            expectedSensor,
-            PROCESSOR_NODE_LEVEL_GROUP,
-            tagMap,
-            operation,
-            recordE2ELatencyMinDescription,
-            recordE2ELatencyMaxDescription,
-            recordE2ELatencyP99Description,
-            recordE2ELatencyP90Description
-        );
-
-        verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID,
PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics));
-    }
-
     private void shouldGetThroughputAndLatencySensorWithParentOrEmptySensor(final String
metricNamePrefix,
                                                                             final String
descriptionOfRate,
                                                                             final String
descriptionOfCount,
@@ -353,7 +321,7 @@ public class ProcessorNodeMetricsTest {
             .andReturn(parentTagMap);
         StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             expectedParentSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             parentTagMap,
             metricNamePrefix,
             descriptionOfRate,
@@ -361,7 +329,7 @@ public class ProcessorNodeMetricsTest {
         );
         StreamsMetricsImpl.addAvgAndMaxToSensor(
             expectedParentSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             parentTagMap,
             metricNamePrefix + StreamsMetricsImpl.LATENCY_SUFFIX,
             descriptionOfAvg,
@@ -378,7 +346,7 @@ public class ProcessorNodeMetricsTest {
             .andReturn(parentTagMap);
         StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             expectedParentSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             parentTagMap,
             metricNamePrefix,
             descriptionOfRate,
@@ -395,7 +363,7 @@ public class ProcessorNodeMetricsTest {
         setUpThroughputSensor(metricNamePrefix, descriptionOfRate, descriptionOfCount, RecordingLevel.DEBUG,
parentSensors);
         StreamsMetricsImpl.addAvgAndMaxToSensor(
             expectedSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             tagMap,
             metricNamePrefix + StreamsMetricsImpl.LATENCY_SUFFIX,
             descriptionOfAvgLatency,
@@ -419,7 +387,7 @@ public class ProcessorNodeMetricsTest {
         expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
         StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             expectedSensor,
-            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            PROCESSOR_NODE_LEVEL_GROUP,
             tagMap,
             metricNamePrefix,
             descriptionOfRate,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 47fec02..e0ee428 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -999,17 +999,15 @@ public class StreamsMetricsImplTest {
     }
 
     @Test
-    public void shouldAddMinAndMaxAndP99AndP90MetricsToSensor() {
+    public void shouldAddMinAndMaxMetricsToSensor() {
         StreamsMetricsImpl
-            .addMinAndMaxAndP99AndP90ToSensor(sensor, group, tags, metricNamePrefix, description1,
description2, description3, description4);
+            .addMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2);
 
         final double valueToRecord1 = 18.0;
         final double valueToRecord2 = 42.0;
         verifyMetric(metricNamePrefix + "-min", description1, valueToRecord1, valueToRecord2,
valueToRecord1);
         verifyMetric(metricNamePrefix + "-max", description2, valueToRecord1, valueToRecord2,
valueToRecord2);
-        verifyMetricWithinError(metricNamePrefix + "-p99", description3, valueToRecord1,
valueToRecord2, valueToRecord2, 1.0);
-        verifyMetricWithinError(metricNamePrefix + "-p90", description4, valueToRecord1,
valueToRecord2, valueToRecord2, 1.0);
-        assertThat(metrics.metrics().size(), equalTo(4 + 1)); // one metric is added automatically
in the constructor of Metrics
+        assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically
in the constructor of Metrics
     }
 
     @Test


Mime
View raw message