kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: clean up node and store sensors (#5450)
Date Sat, 04 Aug 2018 05:43:25 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cf2c5e9  MINOR: clean up node and store sensors (#5450)
cf2c5e9 is described below

commit cf2c5e9ffc066aad37090fb6f2953a602cd8621b
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat Aug 4 00:43:18 2018 -0500

    MINOR: clean up node and store sensors (#5450)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/processor/internals/ProcessorNode.java | 169 +++++++-------
 .../streams/processor/internals/StreamTask.java    |  12 +-
 .../streams/processor/internals/StreamThread.java  |  40 ++--
 .../internals/metrics/StreamsMetricsImpl.java      | 252 +++++++++++++--------
 .../state/internals/MeteredKeyValueStore.java      |  93 +++-----
 .../state/internals/MeteredSessionStore.java       |  43 ++--
 .../state/internals/MeteredWindowStore.java        |  51 +++--
 .../streams/state/internals/metrics/Sensors.java   |  48 ++++
 .../processor/internals/ProcessorNodeTest.java     |  22 +-
 .../processor/internals/StreamTaskTest.java        |  24 +-
 .../internals/StreamsMetricsImplTest.java          |  11 +-
 .../StreamThreadStateStoreProviderTest.java        |   4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   3 +-
 13 files changed, 426 insertions(+), 346 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 64ef538..8dc6417 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
 public class ProcessorNode<K, V> {
 
     // TODO: 'children' can be removed when #forward() via index is removed
@@ -42,32 +45,6 @@ public class ProcessorNode<K, V> {
     private final String name;
     private final Time time;
 
-    private K key;
-    private V value;
-    private final Runnable processDelegate = new Runnable() {
-        @Override
-        public void run() {
-            processor.process(key, value);
-        }
-    };
-    private ProcessorContext context;
-    private final Runnable initDelegate = new Runnable() {
-        @Override
-        public void run() {
-            if (processor != null) {
-                processor.init(context);
-            }
-        }
-    };
-    private final Runnable closeDelegate = new Runnable() {
-        @Override
-        public void run() {
-            if (processor != null) {
-                processor.close();
-            }
-        }
-    };
-
     public final Set<String> stateStores;
 
     public ProcessorNode(final String name) {
@@ -107,10 +84,13 @@ public class ProcessorNode<K, V> {
     }
 
     public void init(final InternalProcessorContext context) {
-        this.context = context;
         try {
             nodeMetrics = new NodeMetrics(context.metrics(), name, context);
-            runAndMeasureLatency(time, initDelegate, nodeMetrics.nodeCreationSensor);
+            final long startNs = time.nanoseconds();
+            if (processor != null) {
+                processor.init(context);
+            }
+            nodeMetrics.nodeCreationSensor.record(time.nanoseconds() - startNs);
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
         }
@@ -118,7 +98,11 @@ public class ProcessorNode<K, V> {
 
     public void close() {
         try {
-            runAndMeasureLatency(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
+            final long startNs = time.nanoseconds();
+            if (processor != null) {
+                processor.close();
+            }
+            nodeMetrics.nodeDestructionSensor.record(time.nanoseconds() - startNs);
             nodeMetrics.removeAllSensors();
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to close processor %s", name), e);
@@ -127,20 +111,15 @@ public class ProcessorNode<K, V> {
 
 
     public void process(final K key, final V value) {
-        this.key = key;
-        this.value = value;
-
-        runAndMeasureLatency(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
+        final long startNs = time.nanoseconds();
+        processor.process(key, value);
+        nodeMetrics.nodeProcessTimeSensor.record(time.nanoseconds() - startNs);
     }
 
     public void punctuate(final long timestamp, final Punctuator punctuator) {
-        final Runnable punctuateDelegate = new Runnable() {
-            @Override
-            public void run() {
-                punctuator.punctuate(timestamp);
-            }
-        };
-        runAndMeasureLatency(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
+        final long startNs = time.nanoseconds();
+        punctuator.punctuate(timestamp);
+        nodeMetrics.nodePunctuateTimeSensor.record(time.nanoseconds() - startNs);
     }
 
     /**
@@ -180,70 +159,90 @@ public class ProcessorNode<K, V> {
         private final Sensor sourceNodeForwardSensor;
         private final Sensor nodeCreationSensor;
         private final Sensor nodeDestructionSensor;
+        private final String taskName;
+        private final String processorNodeName;
 
         private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) {
             this.metrics = metrics;
 
-            // these are all latency metrics
-            this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
-                context.taskId().toString(),
-                "processor-node",
-                processorNodeName,
+            final String group = "stream-processor-node-metrics";
+            final String taskName = context.taskId().toString();
+            final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", processorNodeName);
+            final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", "all");
+
+            nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "process",
-                Sensor.RecordingLevel.DEBUG,
-                "task-id", context.taskId().toString()
-            );
-            this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(
-                context.taskId().toString(),
-                "processor-node",
+                metrics,
+                group,
+                taskName,
                 processorNodeName,
-                "punctuate",
-                Sensor.RecordingLevel.DEBUG,
-                "task-id", context.taskId().toString()
+                allTagMap,
+                tagMap
             );
-            this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
-                context.taskId().toString(),
-                "processor-node",
+
+            nodePunctuateTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
+                "punctuate",
+                metrics,
+                group,
+                taskName,
                 processorNodeName,
-                "create",
-                Sensor.RecordingLevel.DEBUG,
-                "task-id", context.taskId().toString()
+                allTagMap,
+                tagMap
             );
-            this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
-                context.taskId().toString(),
-                "processor-node",
+
+            nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
+                "create",
+                metrics,
+                group,
+                taskName,
                 processorNodeName,
-                "destroy",
-                Sensor.RecordingLevel.DEBUG,
-                "task-id", context.taskId().toString()
+                allTagMap,
+                tagMap
             );
-            this.sourceNodeForwardSensor = metrics.addThroughputSensor(
-                context.taskId().toString(),
-                "processor-node",
+
+            // note: this metric can be removed in the future, as it is only recorded before being immediately removed
+            nodeDestructionSensor = createTaskAndNodeLatencyAndThroughputSensors(
+                "destroy",
+                metrics,
+                group,
+                taskName,
                 processorNodeName,
+                allTagMap,
+                tagMap
+            );
+
+            sourceNodeForwardSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "forward",
-                Sensor.RecordingLevel.DEBUG,
-                "task-id", context.taskId().toString()
+                metrics,
+                group,
+                taskName,
+                processorNodeName,
+                allTagMap,
+                tagMap
             );
+
+            this.taskName = taskName;
+            this.processorNodeName = processorNodeName;
         }
 
         private void removeAllSensors() {
-            metrics.removeSensor(nodeProcessTimeSensor);
-            metrics.removeSensor(nodePunctuateTimeSensor);
-            metrics.removeSensor(sourceNodeForwardSensor);
-            metrics.removeSensor(nodeCreationSensor);
-            metrics.removeSensor(nodeDestructionSensor);
+            metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
         }
-    }
 
-    private static void runAndMeasureLatency(final Time time, final Runnable action, final Sensor sensor) {
-        long startNs = -1;
-        if (sensor.shouldRecord()) {
-            startNs = time.nanoseconds();
-        }
-        action.run();
-        if (startNs != -1) {
-            sensor.record(time.nanoseconds() - startNs);
+        private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String operation,
+                                                                           final StreamsMetricsImpl metrics,
+                                                                           final String group,
+                                                                           final String taskName,
+                                                                           final String processorNodeName,
+                                                                           final Map<String, String> taskTags,
+                                                                           final Map<String, String> nodeTags) {
+            final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
+            addAvgMaxLatency(parent, group, taskTags, operation);
+            addInvocationRateAndCount(parent, group, taskTags, operation);
+            final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
+            addAvgMaxLatency(sensor, group, nodeTags, operation);
+            addInvocationRateAndCount(sensor, group, nodeTags, operation);
+            return sensor;
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6f3b031..7835a54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -77,6 +77,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private int waits = WAIT_ON_PARTIAL_INPUT;
     private final Time time;
     private final TaskMetrics taskMetrics;
+    private Sensor closeSensor;
 
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
@@ -158,8 +159,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final StateDirectory stateDirectory,
                       final ThreadCache cache,
                       final Time time,
-                      final Producer<byte[], byte[]> producer) {
-        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producer, null);
+                      final Producer<byte[], byte[]> producer,
+                      final Sensor closeSensor) {
+        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producer, null, closeSensor);
     }
 
     public StreamTask(final TaskId id,
@@ -173,11 +175,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final ThreadCache cache,
                       final Time time,
                       final Producer<byte[], byte[]> producer,
-                      final RecordCollector recordCollector) {
+                      final RecordCollector recordCollector,
+                      final Sensor closeSensor) {
         super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
 
         this.time = time;
         this.producer = producer;
+        this.closeSensor = closeSensor;
         this.taskMetrics = new TaskMetrics(id, metrics);
 
         final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
@@ -617,6 +621,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             }
         }
 
+        closeSensor.record();
+
         if (firstException != null) {
             throw firstException;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 42f55ef..968e577 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -31,9 +31,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.LogContext;
@@ -438,8 +436,8 @@ public class StreamThread extends Thread {
                 stateDirectory,
                 cache,
                 time,
-                createProducer(taskId)
-            );
+                createProducer(taskId),
+                streamsMetrics.tasksClosedSensor);
         }
 
         private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -527,36 +525,30 @@ public class StreamThread extends Thread {
             final String group = "stream-metrics";
 
             commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO);
-            commitTimeSensor.add(metrics.metricName("commit-latency-avg", group, "The average commit time in ms", tags()), new Avg());
-            commitTimeSensor.add(metrics.metricName("commit-latency-max", group, "The maximum commit time in ms", tags()), new Max());
-            commitTimeSensor.add(metrics.metricName("commit-rate", group, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            commitTimeSensor.add(metrics.metricName("commit-total", group, "The total number of commit calls", tags()), new Count());
+            addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit");
+            addInvocationRateAndCount(commitTimeSensor, group, tagMap(), "commit");
 
             pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO);
-            pollTimeSensor.add(metrics.metricName("poll-latency-avg", group, "The average poll time in ms", tags()), new Avg());
-            pollTimeSensor.add(metrics.metricName("poll-latency-max", group, "The maximum poll time in ms", tags()), new Max());
-            pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tags()), new Count());
+            addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
+            // can't use addInvocationRateAndCount due to non-standard description string
+            pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new Count());
 
             processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
-            processTimeSensor.add(metrics.metricName("process-latency-avg", group, "The average process time in ms", tags()), new Avg());
-            processTimeSensor.add(metrics.metricName("process-latency-max", group, "The maximum process time in ms", tags()), new Max());
-            processTimeSensor.add(metrics.metricName("process-rate", group, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            processTimeSensor.add(metrics.metricName("process-total", group, "The total number of process calls", tags()), new Count());
+            addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
+            addInvocationRateAndCount(processTimeSensor, group, tagMap(), "process");
 
             punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO);
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The average punctuate time in ms", tags()), new Avg());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The maximum punctuate time in ms", tags()), new Max());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", group, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            punctuateTimeSensor.add(metrics.metricName("punctuate-total", group, "The total number of punctuate calls", tags()), new Count());
+            addAvgMaxLatency(punctuateTimeSensor, group, tagMap(), "punctuate");
+            addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(), "punctuate");
 
             taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO);
-            taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total());
+            taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+            taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tagMap()), new Total());
 
             tasksClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tags()), new Total());
+            tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+            tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tagMap()), new Total());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index d36acdc..56166a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
@@ -39,14 +38,18 @@ import java.util.concurrent.TimeUnit;
 
 public class StreamsMetricsImpl implements StreamsMetrics {
     private final Metrics metrics;
-    private final Map<String, String> tags;
     private final Map<Sensor, Sensor> parentSensors;
     private final Sensor skippedRecordsSensor;
     private final String threadName;
 
     private final Deque<String> threadLevelSensors = new LinkedList<>();
     private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
+    private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>();
     private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
+    private final Map<String, Deque<String>> storeLevelSensors = new HashMap<>();
+
+    private static final String SENSOR_PREFIX_DELIMITER = ".";
+    private static final String SENSOR_NAME_DELIMITER = ".s.";
 
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
@@ -54,24 +57,19 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
         this.metrics = metrics;
 
-
-        final HashMap<String, String> tags = new LinkedHashMap<>();
-        tags.put("client-id", threadName);
-        this.tags = Collections.unmodifiableMap(tags);
-
         this.parentSensors = new HashMap<>();
 
         final String group = "stream-metrics";
         skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
-        skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", group, "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
-        skippedRecordsSensor.add(metrics.metricName("skipped-records-total", group, "The total number of skipped records", tags), new Total());
+        skippedRecordsSensor.add(new MetricName("skipped-records-rate", group, "The average per-second number of skipped records", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+        skippedRecordsSensor.add(new MetricName("skipped-records-total", group, "The total number of skipped records", tagMap()), new Total());
     }
 
     public final Sensor threadLevelSensor(final String sensorName,
                                           final Sensor.RecordingLevel recordingLevel,
                                           final Sensor... parents) {
         synchronized (threadLevelSensors) {
-            final String fullSensorName = threadName + "." + sensorName;
+            final String fullSensorName = threadSensorPrefix() + SENSOR_NAME_DELIMITER + sensorName;
             final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
             threadLevelSensors.push(fullSensorName);
 
@@ -87,17 +85,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    private String threadSensorPrefix() {
+        return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
+    }
+
     public final Sensor taskLevelSensor(final String taskName,
                                         final String sensorName,
                                         final Sensor.RecordingLevel recordingLevel,
                                         final Sensor... parents) {
-        final String key = threadName + "." + taskName;
+        final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
             if (!taskLevelSensors.containsKey(key)) {
                 taskLevelSensors.put(key, new LinkedList<>());
             }
 
-            final String fullSensorName = key + "." + sensorName;
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
 
             final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
 
@@ -108,7 +110,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public final void removeAllTaskLevelSensors(final String taskName) {
-        final String key = threadName + "." + taskName;
+        final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
             if (taskLevelSensors.containsKey(key)) {
                 while (!taskLevelSensors.get(key).isEmpty()) {
@@ -119,18 +121,58 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    private String taskSensorPrefix(final String taskName) {
+        return threadSensorPrefix() + SENSOR_PREFIX_DELIMITER + "task" + SENSOR_PREFIX_DELIMITER + taskName;
+    }
+
+    public Sensor nodeLevelSensor(final String taskName,
+                                  final String processorNodeName,
+                                  final String sensorName,
+                                  final Sensor.RecordingLevel recordingLevel,
+                                  final Sensor... parents) {
+        final String key = nodeSensorPrefix(taskName, processorNodeName);
+        synchronized (nodeLevelSensors) {
+            if (!nodeLevelSensors.containsKey(key)) {
+                nodeLevelSensors.put(key, new LinkedList<>());
+            }
+
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+
+            final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
+
+            nodeLevelSensors.get(key).push(fullSensorName);
+
+            return sensor;
+        }
+    }
+
+    public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
+        final String key = nodeSensorPrefix(taskName, processorNodeName);
+        synchronized (nodeLevelSensors) {
+            if (nodeLevelSensors.containsKey(key)) {
+                while (!nodeLevelSensors.get(key).isEmpty()) {
+                    metrics.removeSensor(nodeLevelSensors.get(key).pop());
+                }
+            }
+        }
+    }
+
+    private String nodeSensorPrefix(final String taskName, final String processorNodeName) {
+        return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "node" + SENSOR_PREFIX_DELIMITER + processorNodeName;
+    }
+
     public final Sensor cacheLevelSensor(final String taskName,
                                          final String cacheName,
                                          final String sensorName,
                                          final Sensor.RecordingLevel recordingLevel,
                                          final Sensor... parents) {
-        final String key = threadName + "." + taskName + "." + cacheName;
+        final String key = cacheSensorPrefix(taskName, cacheName);
         synchronized (cacheLevelSensors) {
             if (!cacheLevelSensors.containsKey(key)) {
-                cacheLevelSensors.put(key, new LinkedList<String>());
+                cacheLevelSensors.put(key, new LinkedList<>());
             }
 
-            final String fullSensorName = key + "." + sensorName;
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
 
             final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
 
@@ -141,7 +183,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
-        final String key = threadName + "." + taskName + "." + cacheName;
+        final String key = cacheSensorPrefix(taskName, cacheName);
         synchronized (cacheLevelSensors) {
             if (cacheLevelSensors.containsKey(key)) {
                 while (!cacheLevelSensors.get(key).isEmpty()) {
@@ -152,8 +194,45 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
-    protected final Map<String, String> tags() {
-        return tags;
+    private String cacheSensorPrefix(final String taskName, final String cacheName) {
+        return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "cache" + SENSOR_PREFIX_DELIMITER + cacheName;
+    }
+
+    public final Sensor storeLevelSensor(final String taskName,
+                                         final String storeName,
+                                         final String sensorName,
+                                         final Sensor.RecordingLevel recordingLevel,
+                                         final Sensor... parents) {
+        final String key = storeSensorPrefix(taskName, storeName);
+        synchronized (storeLevelSensors) {
+            if (!storeLevelSensors.containsKey(key)) {
+                storeLevelSensors.put(key, new LinkedList<>());
+            }
+
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+
+            final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
+
+            storeLevelSensors.get(key).push(fullSensorName);
+
+            return sensor;
+        }
+    }
+
+    public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
+        final String key = storeSensorPrefix(taskName, storeName);
+        synchronized (storeLevelSensors) {
+            if (storeLevelSensors.containsKey(key)) {
+                while (!storeLevelSensors.get(key).isEmpty()) {
+                    metrics.removeSensor(storeLevelSensors.get(key).pop());
+                }
+                storeLevelSensors.remove(key);
+            }
+        }
+    }
+
+    private String storeSensorPrefix(final String taskName, final String storeName) {
+        return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store" + SENSOR_PREFIX_DELIMITER + storeName;
     }
 
     public final Sensor skippedRecordsSensor() {
@@ -185,22 +264,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         sensor.record(value);
     }
 
-
-    private String groupNameFromScope(final String scopeName) {
-        return "stream-" + scopeName + "-metrics";
-    }
-
-    private String sensorName(final String operationName, final String entityName) {
-        if (entityName == null) {
-            return operationName;
-        } else {
-            return entityName + "-" + operationName;
-        }
-    }
-
-    public Map<String, String> tagMap(final String... tags) {
-        // extract the additional tags if there are any
-        final Map<String, String> tagMap = new HashMap<>(this.tags);
+    public final Map<String, String> tagMap(final String... tags) {
+        final Map<String, String> tagMap = new HashMap<>();
         if (tags != null) {
             if ((tags.length % 2) != 0) {
                 throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
@@ -209,6 +274,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
             for (int i = 0; i < tags.length; i += 2)
                 tagMap.put(tags[i], tags[i + 1]);
         }
+        tagMap.put("client-id", threadName);
         return tagMap;
     }
 
@@ -220,6 +286,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return tagMap(updatedTags);
     }
 
+
     /**
      * @throws IllegalArgumentException if tags is not constructed in key-value pairs
      */
@@ -229,39 +296,25 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                                 final String operationName,
                                                 final Sensor.RecordingLevel recordingLevel,
                                                 final String... tags) {
-
-        return addLatencyAndThroughputSensor(null,
-                                             scopeName,
-                                             entityName,
-                                             operationName,
-                                             recordingLevel,
-                                             tags);
-
-    }
-
-    public Sensor addLatencyAndThroughputSensor(final String taskName,
-                                                final String scopeName,
-                                                final String entityName,
-                                                final String operationName,
-                                                final Sensor.RecordingLevel recordingLevel,
-                                                final String... tags) {
+        final String group = groupNameFromScope(scopeName);
 
         final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
         final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
-        final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
-        addLatencyMetrics(scopeName, parent, operationName, allTagMap);
-        addThroughputMetrics(scopeName, parent, operationName, allTagMap);
+        final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
+        addAvgMaxLatency(parent, group, allTagMap, operationName);
+        addInvocationRateAndCount(parent, group, allTagMap, operationName);
 
         // add the operation metrics with additional tags
-        final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
-        addLatencyMetrics(scopeName, sensor, operationName, tagMap);
-        addThroughputMetrics(scopeName, sensor, operationName, tagMap);
+        final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
+        addAvgMaxLatency(sensor, group, tagMap, operationName);
+        addInvocationRateAndCount(sensor, group, tagMap, operationName);
 
         parentSensors.put(sensor, parent);
 
         return sensor;
+
     }
 
     /**
@@ -273,33 +326,18 @@ public class StreamsMetricsImpl implements StreamsMetrics {
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags) {
-
-        return addThroughputSensor(null,
-                                   scopeName,
-                                   entityName,
-                                   operationName,
-                                   recordingLevel,
-                                   tags);
-
-    }
-
-    public Sensor addThroughputSensor(final String taskName,
-                                      final String scopeName,
-                                      final String entityName,
-                                      final String operationName,
-                                      final Sensor.RecordingLevel recordingLevel,
-                                      final String... tags) {
+        final String group = groupNameFromScope(scopeName);
 
         final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
         final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
-        final Sensor parent = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), null), recordingLevel);
-        addThroughputMetrics(scopeName, parent, operationName, allTagMap);
+        final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
+        addInvocationRateAndCount(parent, group, allTagMap, operationName);
 
         // add the operation metrics with additional tags
-        final Sensor sensor = metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName), entityName), recordingLevel, parent);
-        addThroughputMetrics(scopeName, sensor, operationName, tagMap);
+        final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
+        addInvocationRateAndCount(sensor, group, tagMap, operationName);
 
         parentSensors.put(sensor, parent);
 
@@ -307,46 +345,57 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
     }
 
+    private String externalChildSensorName(final String operationName, final String entityName) {
+        return "external" + SENSOR_PREFIX_DELIMITER + threadName
+            + SENSOR_PREFIX_DELIMITER + "entity" + SENSOR_PREFIX_DELIMITER + entityName
+            + SENSOR_NAME_DELIMITER + operationName;
+    }
 
-    private String buildUniqueSensorName(final String operationName, final String taskName) {
-        final String task = taskName == null ? "" : taskName + ".";
-        return threadName + "." + task + operationName;
+    private String externalParentSensorName(final String operationName) {
+        return "external" + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + operationName;
     }
 
-    private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+
+    public static void addAvgMaxLatency(final Sensor sensor,
+                                        final String group,
+                                        final Map<String, String> tags,
+                                        final String operation) {
         sensor.add(
-            metrics.metricName(
-                opName + "-latency-avg",
-                groupNameFromScope(scopeName),
-                "The average latency of " + opName + " operation.", tags),
+            new MetricName(
+                operation + "-latency-avg",
+                group,
+                "The average latency of " + operation + " operation.",
+                tags),
             new Avg()
         );
         sensor.add(
-            metrics.metricName(
-                opName + "-latency-max",
-                groupNameFromScope(scopeName),
-                "The max latency of " + opName + " operation.",
-                tags
-            ),
+            new MetricName(
+                operation + "-latency-max",
+                group,
+                "The max latency of " + operation + " operation.",
+                tags),
             new Max()
         );
     }
 
-    private void addThroughputMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+    public static void addInvocationRateAndCount(final Sensor sensor,
+                                                 final String group,
+                                                 final Map<String, String> tags,
+                                                 final String operation) {
         sensor.add(
-            metrics.metricName(
-                opName + "-rate",
-                groupNameFromScope(scopeName),
-                "The average number of occurrence of " + opName + " operation per second.",
+            new MetricName(
+                operation + "-rate",
+                group,
+                "The average number of occurrence of " + operation + " operation per second.",
                 tags
             ),
             new Rate(TimeUnit.SECONDS, new Count())
         );
         sensor.add(
-            metrics.metricName(
-                opName + "-total",
-                groupNameFromScope(scopeName),
-                "The total number of occurrence of " + opName + " operations.",
+            new MetricName(
+                operation + "-total",
+                group,
+                "The total number of occurrence of " + operation + " operations.",
                 tags
             ),
             new Count()
@@ -367,4 +416,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    private static String groupNameFromScope(final String scopeName) {
+        return "stream-" + scopeName + "-metrics";
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index fd79543..57458fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -32,6 +32,10 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
 
 /**
  * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
@@ -59,6 +63,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     private Sensor rangeTime;
     private Sensor flushTime;
     private StreamsMetricsImpl metrics;
+    private String taskName;
 
     MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
                          final String metricScope,
@@ -77,79 +82,27 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        final String name = name();
-        final String tagKey = "task-id";
-        final String taskName = context.taskId().toString();
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+
+        taskName = context.taskId().toString();
+        final String metricsGroup = "stream-" + metricScope + "-metrics";
+        final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all");
+        final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name());
 
         this.serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-        this.metrics = (StreamsMetricsImpl) context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "put",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "put-if-absent",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "get",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "delete",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "put-all",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.allTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "all",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "range",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "flush",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(
-            taskName,
-            metricScope,
-            name,
-            "restore",
-            Sensor.RecordingLevel.DEBUG,
-            tagKey, taskName);
+        putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        putIfAbsentTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put-if-absent", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        putAllTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put-all", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        getTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "get", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        allTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "all", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        rangeTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "range", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        deleteTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "delete", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        final Sensor restoreTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
 
         // register and possibly restore the state from the logs
         if (restoreTime.shouldRecord()) {
@@ -165,6 +118,12 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     }
 
     @Override
+    public void close() {
+        super.close();
+        metrics.removeAllStoreLevelSensors(taskName, name());
+    }
+
+    @Override
     public long approximateNumEntries() {
         return inner.approximateNumEntries();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b285b65..65ab758 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -30,8 +30,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
+
 public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, V> {
     private final SessionStore<Bytes, byte[]> inner;
     private final String metricScope;
@@ -44,6 +48,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
     private Sensor fetchTime;
     private Sensor flushTime;
     private Sensor removeTime;
+    private String taskName;
 
     MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
                         final String metricScope,
@@ -65,30 +70,40 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-        final String tagKey = "task-id";
-        final String taskName = context.taskId().toString();
         this.metrics = (StreamsMetricsImpl) context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
-                                                                  Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        this.removeTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "remove",
-                                                                     Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
-                                                                              Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+
+        taskName = context.taskId().toString();
+        final String metricsGroup = "stream-" + metricScope + "-metrics";
+        final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all");
+        final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name());
+
+        putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        removeTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "remove", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        final Sensor restoreTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
             inner.init(context, root);
         } finally {
-            this.metrics.recordLatency(restoreTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(
+                restoreTime,
+                startNs,
+                time.nanoseconds()
+            );
         }
     }
 
     @Override
+    public void close() {
+        super.close();
+        metrics.removeAllStoreLevelSensors(taskName, name());
+    }
+
+
+    @Override
     public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
                                                          final long earliestSessionEndTime,
                                                          final long latestSessionStartTime) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 62ed6c6..5a27ed4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -31,6 +31,11 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import java.util.Map;
+
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
+
 public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
 
     private final WindowStore<Bytes, byte[]> inner;
@@ -44,6 +49,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     private Sensor flushTime;
     private StateSerdes<K, V> serdes;
     private ProcessorContext context;
+    private String taskName;
 
     MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
                        final String metricScope,
@@ -65,27 +71,38 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-        final String tagKey = "task-id";
-        final String taskName = context.taskId().toString();
         this.metrics = (StreamsMetricsImpl) context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "put",
-                                                                  Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "fetch",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "flush",
-                                                                    Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(), "restore",
-                                                                              Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+
+        taskName = context.taskId().toString();
+        final String metricsGroup = "stream-" + metricScope + "-metrics";
+        final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all");
+        final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name());
+
+        putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+        final Sensor restoreTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {
             inner.init(context, root);
         } finally {
-            this.metrics.recordLatency(restoreTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(
+                restoreTime,
+                startNs,
+                time.nanoseconds()
+            );
         }
     }
 
     @Override
+    public void close() {
+        super.close();
+        metrics.removeAllStoreLevelSensors(taskName, name());
+    }
+
+    @Override
     public void put(final K key, final V value) {
         put(key, value, context.timestamp());
     }
@@ -134,16 +151,16 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     public KeyValueIterator<Windowed<K>, V> all() {
         return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
     }
-    
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
-        return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo), 
-                                                     fetchTime, 
-                                                     metrics, 
-                                                     serdes, 
+        return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo),
+                                                     fetchTime,
+                                                     metrics,
+                                                     serdes,
                                                      time);
     }
-    
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
         return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
new file mode 100644
index 0000000..fdbc7c8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+import java.util.Map;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
+public final class Sensors {
+    
+    private Sensors() {}
+
+    public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final Sensor.RecordingLevel level,
+                                                                       final String operation,
+                                                                       final StreamsMetricsImpl metrics,
+                                                                       final String metricsGroup,
+                                                                       final String taskName,
+                                                                       final String storeName,
+                                                                       final Map<String, String> taskTags,
+                                                                       final Map<String, String> storeTags) {
+        final Sensor taskSensor = metrics.taskLevelSensor(taskName, operation, level);
+        addAvgMaxLatency(taskSensor, metricsGroup, taskTags, operation);
+        addInvocationRateAndCount(taskSensor, metricsGroup, taskTags, operation);
+        final Sensor sensor = metrics.storeLevelSensor(taskName, storeName, operation, level, taskSensor);
+        addAvgMaxLatency(sensor, metricsGroup, storeTags, operation);
+        addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
+        return sensor;
+    }
+}
+
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 65dd022..4e14143 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -111,17 +111,11 @@ public class ProcessorNodeTest {
         metricTags.put("task-id", context.taskId().toString());
         metricTags.put("client-id", "mock");
 
-
-        for (final String operation : latencyOperations) {
-            assertNotNull(metrics.getSensor("name-mock.0_0." + operation));
-        }
-        assertNotNull(metrics.getSensor("name-mock.0_0." + throughputOperation));
-
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
         }
         assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
                                                                "The average number of occurrence of " + throughputOperation + " operation per second.",
@@ -130,10 +124,10 @@ public class ProcessorNodeTest {
         // test "all"
         metricTags.put("processor-node-id", "all");
         for (final String opName : latencyOperations) {
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-avg", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-latency-max", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-rate", groupName, metricTags);
-            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),  opName + "-total", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
         }
         assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate",
                                                                groupName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 146bcb3..8f25c53 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -627,8 +627,8 @@ public class StreamTaskTest {
                 public void flush() {
                     flushed.set(true);
                 }
-            }
-        );
+            },
+            metrics.sensor("dummy"));
         streamTask.flushState();
         assertTrue(flushed.get());
     }
@@ -972,8 +972,8 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            producer
-        );
+            producer,
+            metrics.sensor("dummy"));
         task.initializeStateStores();
         task.initializeTopology();
 
@@ -1041,8 +1041,8 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            producer
-        );
+            producer,
+            metrics.sensor("dummy"));
     }
 
     private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
@@ -1063,8 +1063,8 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            producer
-        );
+            producer,
+            metrics.sensor("dummy"));
     }
 
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
@@ -1089,8 +1089,8 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            producer
-        );
+            producer,
+            metrics.sensor("dummy"));
     }
 
     // this task will throw exception when processing (on partition2), flushing, suspending and closing
@@ -1116,8 +1116,8 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            producer
-        ) {
+            producer,
+            metrics.sensor("dummy")) {
             @Override
             protected void flushState() {
                 throw new RuntimeException("KABOOM!");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index a72dc79..b065e2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -40,7 +40,6 @@ public class StreamsMetricsImplTest {
     @Test
     public void testRemoveSensor() {
         final String sensorName = "sensor1";
-        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
@@ -52,10 +51,10 @@ public class StreamsMetricsImplTest {
         final Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1);
         streamsMetrics.removeSensor(sensor1a);
 
-        final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
 
-        final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
     }
 
@@ -64,12 +63,11 @@ public class StreamsMetricsImplTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
         final int defaultMetrics = streamsMetrics.metrics().size();
 
-        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
 
-        final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         // 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
@@ -85,12 +83,11 @@ public class StreamsMetricsImplTest {
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
         final int defaultMetrics = streamsMetrics.metrics().size();
 
-        final String taskName = "task";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
 
-        final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName,  scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
         // 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 4916cb0..75bb219 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -190,8 +190,8 @@ public class StreamThreadStateStoreProviderTest {
             stateDirectory,
             null,
             new MockTime(),
-            clientSupplier.getProducer(new HashMap<String, Object>())
-        ) {
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            metrics.sensor("dummy")) {
             @Override
             protected void updateOffsetLimits() {}
         };
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 74fa8ca..d2796db 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -338,7 +338,8 @@ public class TopologyTestDriver implements Closeable {
                 stateDirectory,
                 cache,
                 mockWallClockTime,
-                producer);
+                producer,
+                metrics.sensor("dummy"));
             task.initializeStateStores();
             task.initializeTopology();
             context = (InternalProcessorContext) task.context();


Mime
View raw message