kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: fix NamedCache metrics in Streams (#4917)
Date Thu, 26 Apr 2018 17:01:23 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc6e922  MINOR: fix NamedCache metrics in Streams (#4917)
fc6e922 is described below

commit fc6e92260c3ff3b5ef72ac05e7a518a8cb2e7090
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Apr 26 12:01:17 2018 -0500

    MINOR: fix NamedCache metrics in Streams (#4917)
    
    * Fixes a bug in which all NamedCache instances in a process shared
    one parent metric.
    
    * Also fixes a bug which incorrectly computed the per-cache metric tag
    (which was undetected due to the former bug).
    
    * Drop the StreamsMetricsConventions#xLevelSensorName convention
    in favor of StreamsMetricsImpl#xLevelSensor to allow StreamsMetricsImpl
    to track thread- and cache-level metrics, so that they may be cleanly declared
    from anywhere but still unloaded at the appropriate time. This was necessary
    right now so that the NamedCache could register a thread-level parent sensor
    to be unloaded when the thread, not the cache, is closed.
    
    * The above changes made it mostly unnecessary for the StreamsMetricsImpl to
    expose a reference to the underlying Metrics registry, so I did a little extra work
    to remove that reference, including removing inconsistently-used and unnecessary
    calls to Metrics#close() in the tests.
    
    The existing tests should be sufficient to verify this change.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   3 +-
 .../processor/internals/GlobalStreamThread.java    |   2 +-
 .../streams/processor/internals/StreamTask.java    |  53 +++++++++-
 .../streams/processor/internals/StreamThread.java  |  82 ++++++---------
 .../metrics/StreamsMetricsConventions.java         |  39 -------
 .../internals/metrics/StreamsMetricsImpl.java      | 117 ++++++++++++++++++---
 .../kafka/streams/state/internals/NamedCache.java  |  83 +++++++++------
 .../kafka/streams/state/internals/ThreadCache.java |  10 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   1 -
 .../processor/internals/ProcessorNodeTest.java     |   1 -
 .../streams/processor/internals/SinkNodeTest.java  |   6 --
 .../processor/internals/StreamTaskTest.java        |   1 -
 .../processor/internals/StreamThreadTest.java      |   9 --
 .../state/internals/AbstractKeyValueStoreTest.java |   1 -
 .../state/internals/CachingKeyValueStoreTest.java  |   1 -
 .../state/internals/CachingSessionStoreTest.java   |   1 -
 .../state/internals/CachingWindowStoreTest.java    |   1 -
 .../ChangeLoggingKeyValueBytesStoreTest.java       |   1 -
 .../state/internals/MeteredWindowStoreTest.java    |   6 --
 .../streams/state/internals/NamedCacheTest.java    |  25 ++---
 .../RocksDBKeyValueStoreSupplierTest.java          |   1 -
 .../internals/RocksDBSegmentedBytesStoreTest.java  |   1 -
 .../internals/RocksDBSessionStoreSupplierTest.java |   1 -
 .../state/internals/RocksDBSessionStoreTest.java   |   1 -
 .../streams/state/internals/RocksDBStoreTest.java  |   1 -
 .../internals/RocksDBWindowStoreSupplierTest.java  |   1 -
 .../state/internals/RocksDBWindowStoreTest.java    |   1 -
 .../state/internals/SegmentIteratorTest.java       |   1 -
 .../streams/state/internals/SegmentsTest.java      |   1 -
 .../state/internals/StoreChangeLoggerTest.java     |   6 --
 .../kafka/test/InternalMockProcessorContext.java   |   6 --
 .../org/apache/kafka/test/KStreamTestDriver.java   |   8 +-
 32 files changed, 258 insertions(+), 214 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a99f8cc..776dde7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -673,7 +673,8 @@ public class KafkaStreams {
             throw new StreamsException(fatal);
         }
 
-        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+        final MetricConfig metricConfig = new MetricConfig()
+            .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index af3c7db..1c34897 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -298,7 +298,7 @@ public class GlobalStreamThread extends Thread {
                 log.error("Failed to close state maintainer due to the following error:", e);
             }
 
-            streamsMetrics.removeOwnedSensors();
+            streamsMetrics.removeAllThreadLevelSensors();
 
             setState(DEAD);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d9515b1..b975324 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,9 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -42,6 +47,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
 import static java.util.Collections.singleton;
@@ -72,16 +78,57 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
         final Sensor taskCommitTimeSensor;
+        private final String taskName;
 
 
         TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
-            final String name = id.toString();
+            taskName = id.toString();
             this.metrics = metrics;
-            taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG);
+            final String group = "stream-task-metrics";
+
+            // first add the global operation metrics if not yet, with the global tags only
+            final Map<String, String> allTagMap = metrics.tagMap("task-id", "all");
+            final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG);
+            parent.add(
+                new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap),
+                new Avg()
+            );
+            parent.add(
+                new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap),
+                new Max()
+            );
+            parent.add(
+                new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap),
+                new Rate(TimeUnit.SECONDS, new Count())
+            );
+            parent.add(
+                new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap),
+                new Count()
+            );
+
+            // add the operation metrics with additional tags
+            final Map<String, String> tagMap = metrics.tagMap("task-id", taskName);
+            taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG, parent);
+            taskCommitTimeSensor.add(
+                new MetricName("commit-latency-avg", group, "The average latency of commit operation.", tagMap),
+                new Avg()
+            );
+            taskCommitTimeSensor.add(
+                new MetricName("commit-latency-max", group, "The max latency of commit operation.", tagMap),
+                new Max()
+            );
+            taskCommitTimeSensor.add(
+                new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", tagMap),
+                new Rate(TimeUnit.SECONDS, new Count())
+            );
+            taskCommitTimeSensor.add(
+                new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap),
+                new Count()
+            );
         }
 
         void removeAllSensors() {
-            metrics.removeSensor(taskCommitTimeSensor);
+            metrics.removeAllTaskLevelSensors(taskName);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 39727be..e4ad138 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -52,10 +52,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -64,7 +62,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
 
 public class StreamThread extends Thread {
 
@@ -509,58 +506,41 @@ public class StreamThread extends Thread {
         private final Sensor taskCreatedSensor;
         private final Sensor tasksClosedSensor;
 
-        private final Deque<String> ownedSensors = new LinkedList<>();
-
         StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
             super(metrics, threadName);
-            final String groupName = "stream-metrics";
-
-            commitTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "commit-latency"), Sensor.RecordingLevel.INFO);
-            commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
-            commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
-            commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
-            ownedSensors.push(commitTimeSensor.name());
-
-            pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "poll-latency"), Sensor.RecordingLevel.INFO);
-            pollTimeSensor.add(metrics.metricName("poll-latency-avg", groupName, "The average poll time in ms", tags()), new Avg());
-            pollTimeSensor.add(metrics.metricName("poll-latency-max", groupName, "The maximum poll time in ms", tags()), new Max());
-            pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            pollTimeSensor.add(metrics.metricName("poll-total", groupName, "The total number of record-poll calls", tags()), new Count());
-            ownedSensors.push(pollTimeSensor.name());
-
-            processTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "process-latency"), Sensor.RecordingLevel.INFO);
-            processTimeSensor.add(metrics.metricName("process-latency-avg", groupName, "The average process time in ms", tags()), new Avg());
-            processTimeSensor.add(metrics.metricName("process-latency-max", groupName, "The maximum process time in ms", tags()), new Max());
-            processTimeSensor.add(metrics.metricName("process-rate", groupName, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            processTimeSensor.add(metrics.metricName("process-total", groupName, "The total number of process calls", tags()), new Count());
-            ownedSensors.push(processTimeSensor.name());
-
-            punctuateTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"), Sensor.RecordingLevel.INFO);
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName, "The average punctuate time in ms", tags()), new Avg());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName, "The maximum punctuate time in ms", tags()), new Max());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", groupName, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            punctuateTimeSensor.add(metrics.metricName("punctuate-total", groupName, "The total number of punctuate calls", tags()), new Count());
-            ownedSensors.push(punctuateTimeSensor.name());
-
-            taskCreatedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-created"), Sensor.RecordingLevel.INFO);
+            final String group = "stream-metrics";
+
+            commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO);
+            commitTimeSensor.add(metrics.metricName("commit-latency-avg", group, "The average commit time in ms", tags()), new Avg());
+            commitTimeSensor.add(metrics.metricName("commit-latency-max", group, "The maximum commit time in ms", tags()), new Max());
+            commitTimeSensor.add(metrics.metricName("commit-rate", group, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            commitTimeSensor.add(metrics.metricName("commit-total", group, "The total number of commit calls", tags()), new Count());
+
+            pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO);
+            pollTimeSensor.add(metrics.metricName("poll-latency-avg", group, "The average poll time in ms", tags()), new Avg());
+            pollTimeSensor.add(metrics.metricName("poll-latency-max", group, "The maximum poll time in ms", tags()), new Max());
+            pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tags()), new Count());
+
+            processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
+            processTimeSensor.add(metrics.metricName("process-latency-avg", group, "The average process time in ms", tags()), new Avg());
+            processTimeSensor.add(metrics.metricName("process-latency-max", group, "The maximum process time in ms", tags()), new Max());
+            processTimeSensor.add(metrics.metricName("process-rate", group, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            processTimeSensor.add(metrics.metricName("process-total", group, "The total number of process calls", tags()), new Count());
+
+            punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO);
+            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The average punctuate time in ms", tags()), new Avg());
+            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The maximum punctuate time in ms", tags()), new Max());
+            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", group, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            punctuateTimeSensor.add(metrics.metricName("punctuate-total", group, "The total number of punctuate calls", tags()), new Count());
+
+            taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO);
             taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
             taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total());
-            ownedSensors.push(taskCreatedSensor.name());
-
-            tasksClosedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-closed"), Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(metrics.metricName("task-closed-rate", groupName, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
-            tasksClosedSensor.add(metrics.metricName("task-closed-total", groupName, "The total number of closed tasks", tags()), new Total());
-            ownedSensors.push(tasksClosedSensor.name());
-        }
 
-        public void removeOwnedSensors() {
-            synchronized (ownedSensors) {
-                super.removeOwnedSensors();
-                while (!ownedSensors.isEmpty()) {
-                    registry().removeSensor(ownedSensors.pop());
-                }
-            }
+            tasksClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
+            tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tags()), new Total());
         }
     }
 
@@ -1165,7 +1145,7 @@ public class StreamThread extends Thread {
         } catch (final Throwable e) {
             log.error("Failed to close restore consumer due to the following error:", e);
         }
-        streamsMetrics.removeOwnedSensors();
+        streamsMetrics.removeAllThreadLevelSensors();
 
         setState(State.DEAD);
         log.info("Shutdown complete");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
deleted file mode 100644
index cfe206c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.metrics;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public final class StreamsMetricsConventions {
-    private StreamsMetricsConventions() {
-    }
-
-    public static String threadLevelSensorName(final String threadName, final String sensorName) {
-        return "thread." + threadName + "." + sensorName;
-    }
-
-    static Map<String, String> threadLevelTags(final String threadName, final Map<String, String> tags) {
-        if (tags.containsKey("client-id")) {
-            return tags;
-        } else {
-            final LinkedHashMap<String, String> newTags = new LinkedHashMap<>(tags);
-            newTags.put("client-id", threadName);
-            return newTags;
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 76a1d2b..0251265 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -31,35 +31,125 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
-
 public class StreamsMetricsImpl implements StreamsMetrics {
     private final Metrics metrics;
     private final Map<String, String> tags;
     private final Map<Sensor, Sensor> parentSensors;
-    private final Deque<String> ownedSensors = new LinkedList<>();
     private final Sensor skippedRecordsSensor;
+    private final String threadName;
+
+    private final Deque<String> threadLevelSensors = new LinkedList<>();
+    private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
+    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
 
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
+        this.threadName = threadName;
 
         this.metrics = metrics;
-        this.tags = StreamsMetricsConventions.threadLevelTags(threadName, Collections.<String, String>emptyMap());
+
+
+        final HashMap<String, String> tags = new LinkedHashMap<>();
+        tags.put("client-id", threadName);
+        this.tags = Collections.unmodifiableMap(tags);
+
         this.parentSensors = new HashMap<>();
 
-        skippedRecordsSensor = metrics.sensor(threadLevelSensorName(threadName, "skipped-records"), Sensor.RecordingLevel.INFO);
-        skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
-        skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tags), new Total());
-        ownedSensors.push(skippedRecordsSensor.name());
+        final String group = "stream-metrics";
+        skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
+        skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", group, "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
+        skippedRecordsSensor.add(metrics.metricName("skipped-records-total", group, "The total number of skipped records", tags), new Total());
+    }
+
+    public final Sensor threadLevelSensor(final String sensorName,
+                                          final Sensor.RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (threadLevelSensors) {
+            final String fullSensorName = threadName + "." + sensorName;
+            final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
+            threadLevelSensors.push(fullSensorName);
+
+            return sensor;
+        }
+    }
+
+    public final void removeAllThreadLevelSensors() {
+        synchronized (threadLevelSensors) {
+            while (!threadLevelSensors.isEmpty()) {
+                metrics.removeSensor(threadLevelSensors.pop());
+            }
+        }
+    }
+
+    public final Sensor taskLevelSensor(final String taskName,
+                                         final String sensorName,
+                                         final Sensor.RecordingLevel recordingLevel,
+                                         final Sensor... parents) {
+        final String key = threadName + "." + taskName;
+        synchronized (taskLevelSensors) {
+            if (!taskLevelSensors.containsKey(key)) {
+                taskLevelSensors.put(key, new LinkedList<String>());
+            }
+
+            final String fullSensorName = key + "." + sensorName;
+
+            final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
+
+            taskLevelSensors.get(key).push(fullSensorName);
+
+            return sensor;
+        }
     }
 
-    public final Metrics registry() {
-        return metrics;
+    public final void removeAllTaskLevelSensors(final String taskName) {
+        final String key = threadName + "." + taskName;
+        synchronized (taskLevelSensors) {
+            if (taskLevelSensors.containsKey(key)) {
+                while (!taskLevelSensors.get(key).isEmpty()) {
+                    metrics.removeSensor(taskLevelSensors.get(key).pop());
+                }
+                taskLevelSensors.remove(key);
+            }
+        }
+    }
+
+    public final Sensor cacheLevelSensor(final String taskName,
+                                         final String cacheName,
+                                         final String sensorName,
+                                         final Sensor.RecordingLevel recordingLevel,
+                                         final Sensor... parents) {
+        final String key = threadName + "." + taskName + "." + cacheName;
+        synchronized (cacheLevelSensors) {
+            if (!cacheLevelSensors.containsKey(key)) {
+                cacheLevelSensors.put(key, new LinkedList<String>());
+            }
+
+            final String fullSensorName = key + "." + sensorName;
+
+            final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
+
+            cacheLevelSensors.get(key).push(fullSensorName);
+
+            return sensor;
+        }
+    }
+
+    public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
+        final String key = threadName + "." + taskName + "." + cacheName;
+        synchronized (cacheLevelSensors) {
+            if (cacheLevelSensors.containsKey(key)) {
+                while (!cacheLevelSensors.get(key).isEmpty()) {
+                    metrics.removeSensor(cacheLevelSensors.get(key).pop());
+                }
+                cacheLevelSensors.remove(key);
+            }
+        }
     }
 
     protected final Map<String, String> tags() {
@@ -236,11 +326,4 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
-    public void removeOwnedSensors() {
-        synchronized (ownedSensors) {
-            while (!ownedSensors.isEmpty()) {
-                metrics.removeSensor(ownedSensors.pop());
-            }
-        }
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index de62a2d..d058c9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ class NamedCache {
     private long numOverwrites = 0;
     private long numFlushes = 0;
 
-    NamedCache(final String name, final StreamsMetrics metrics) {
+    NamedCache(final String name, final StreamsMetricsImpl metrics) {
         this.name = name;
         this.namedCacheMetrics = new NamedCacheMetrics(metrics, name);
     }
@@ -355,45 +355,66 @@ class NamedCache {
 
     private static class NamedCacheMetrics {
         private final StreamsMetricsImpl metrics;
-        private final String groupName;
-        private final Map<String, String> metricTags;
-        private final Map<String, String> allMetricTags;
+
         private final Sensor hitRatioSensor;
+        private final String taskName;
+        private final String cacheName;
 
-        private NamedCacheMetrics(final StreamsMetrics metrics, final String name) {
-            final String scope = "record-cache";
-            final String opName = "hitRatio";
-            final String tagKey = scope + "-id";
-            final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name);
-            this.groupName = "stream-" + scope + "-metrics";
-            this.metrics = (StreamsMetricsImpl) metrics;
-            this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, "all",
-                "task-id", ThreadCache.taskIDfromCacheName(name));
-            this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, tagValue,
-                "task-id", ThreadCache.taskIDfromCacheName(name));
+        private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheName) {
+            taskName = ThreadCache.taskIDfromCacheName(cacheName);
+            this.cacheName = cacheName;
+            this.metrics = metrics;
+            final String group = "stream-record-cache-metrics";
 
             // add parent
-            final Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
-            parent.add(this.metrics.registry().metricName(opName + "-avg", groupName,
-                    "The average cache hit ratio.", allMetricTags), new Avg());
-            parent.add(this.metrics.registry().metricName(opName + "-min", groupName,
-                    "The minimum cache hit ratio.", allMetricTags), new Min());
-            parent.add(this.metrics.registry().metricName(opName + "-max", groupName,
-                    "The maximum cache hit ratio.", allMetricTags), new Max());
+            final Map<String, String> allMetricTags = metrics.tagMap(
+                "record-cache-id", "all",
+                "task-id", taskName
+            );
+            final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG);
+            taskLevelHitRatioSensor.add(
+                new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags),
+                new Avg()
+            );
+            taskLevelHitRatioSensor.add(
+                new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", allMetricTags),
+                new Min()
+            );
+            taskLevelHitRatioSensor.add(
+                new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", allMetricTags),
+                new Max()
+            );
 
             // add child
-            hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent);
-            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName,
-                    "The average cache hit ratio.", metricTags), new Avg());
-            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
-                    "The minimum cache hit ratio.", metricTags), new Min());
-            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
-                    "The maximum cache hit ratio.", metricTags), new Max());
+            final Map<String, String> metricTags = metrics.tagMap(
+                "record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName),
+                "task-id", taskName
+            );
+
+            hitRatioSensor = metrics.cacheLevelSensor(
+                taskName,
+                cacheName,
+                "hitRatio",
+                Sensor.RecordingLevel.DEBUG,
+                taskLevelHitRatioSensor
+            );
+            hitRatioSensor.add(
+                new MetricName("hitRatio-avg", group, "The average cache hit ratio.", metricTags),
+                new Avg()
+            );
+            hitRatioSensor.add(
+                new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", metricTags),
+                new Min()
+            );
+            hitRatioSensor.add(
+                new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", metricTags),
+                new Max()
+            );
 
         }
 
         private void removeAllSensors() {
-            metrics.removeSensor(hitRatioSensor);
+            metrics.removeAllCacheLevelSensors(taskName, cacheName);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index b1fd198..b947664 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
 import java.util.Collections;
@@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
 public class ThreadCache {
     private final Logger log;
     private final long maxCacheSizeBytes;
-    private final StreamsMetrics metrics;
+    private final StreamsMetricsImpl metrics;
     private final Map<String, NamedCache> caches = new HashMap<>();
 
     // internal stats
@@ -52,7 +52,7 @@ public class ThreadCache {
         void apply(final List<DirtyEntry> dirty);
     }
 
-    public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetrics metrics) {
+    public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetricsImpl metrics) {
         this.maxCacheSizeBytes = maxCacheSizeBytes;
         this.metrics = metrics;
         this.log = logContext.logger(getClass());
@@ -91,7 +91,7 @@ public class ThreadCache {
      * @return
      */
     public static String taskIDfromCacheName(final String cacheName) {
-        String[] tokens = cacheName.split("-");
+        String[] tokens = cacheName.split("-", 2);
         return tokens[0];
     }
 
@@ -101,7 +101,7 @@ public class ThreadCache {
      * @return
      */
     public static String underlyingStoreNamefromCacheName(final String cacheName) {
-        String[] tokens = cacheName.split("-");
+        String[] tokens = cacheName.split("-", 2);
         return tokens[1];
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 301d448..8cb2eae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -121,7 +121,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @After
     public void closeStore() {
-        context.close();
         sessionStore.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 1409d68..a7a2610 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -148,7 +148,6 @@ public class ProcessorNodeTest {
             "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
 
 
-        context.close();
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 0013167..753d26b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -55,11 +54,6 @@ public class SinkNodeTest {
         sink.init(context);
     }
 
-    @After
-    public void after() {
-        context.close();
-    }
-
     @Test
     @SuppressWarnings("unchecked")
     public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 28e0b46..598e47e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -232,7 +232,6 @@ public class StreamTaskTest {
     public void testMetrics() {
         task = createStatelessTask(createConfig(false));
 
-        assertNotNull(metrics.getSensor("commit"));
         assertNotNull(getMetric("%s-latency-avg", "The average latency of %s operation.", task.id().toString()));
         assertNotNull(getMetric("%s-latency-max", "The max latency of %s operation.", task.id().toString()));
         assertNotNull(getMetric("%s-rate", "The average number of occurrence of %s operation per second.", task.id().toString()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 36a1bce..3ae7acb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -243,17 +243,8 @@ public class StreamThreadTest {
     public void testMetricsCreatedAtStartup() {
         final StreamThread thread = createStreamThread(clientId, config, false);
         final String defaultGroupName = "stream-metrics";
-        final String defaultPrefix = "thread." + thread.getName();
         final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName());
 
-        assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".process-latency"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-latency"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".task-created"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed"));
-        assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));
-
         assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
         assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
         assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 51c782a..c4536df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -63,7 +63,6 @@ public abstract class AbstractKeyValueStoreTest {
     @After
     public void after() {
         store.close();
-        context.close();
         driver.clear();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 8705326..3e0241e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -82,7 +82,6 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     @After
     public void after() {
         super.after();
-        context.close();
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index a9a66e9..b77f4e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -82,7 +82,6 @@ public class CachingSessionStoreTest {
 
     @After
     public void close() {
-        context.close();
         cachingStore.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index c25655b..a87b2e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -87,7 +87,6 @@ public class CachingWindowStoreTest {
 
     @After
     public void closeStore() {
-        context.close();
         cachingStore.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 7342c93..5bb0de7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -76,7 +76,6 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 
     @After
     public void after() {
-        context.close();
         store.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index eab523e..19bd523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -83,11 +82,6 @@ public class MeteredWindowStoreTest {
         );
     }
 
-    @After
-    public void after() {
-        context.close();
-    }
-
     @Test
     public void shouldRecordRestoreLatencyOnInit() {
         innerStoreMock.init(context, store);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 6b410dc..9ae0feb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,12 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,13 +43,14 @@ import static org.junit.Assert.assertSame;
 public class NamedCacheTest {
 
     private NamedCache cache;
-    private MockStreamsMetrics streamMetrics;
+    private StreamsMetricsImpl metrics;
     private final String taskIDString = "0.0";
     private final String underlyingStoreName = "storeName";
+
     @Before
     public void setUp() {
-        streamMetrics = new MockStreamsMetrics(new Metrics());
-        cache = new NamedCache(taskIDString + "-" + underlyingStoreName, streamMetrics);
+        metrics = new MockStreamsMetrics(new Metrics());
+        cache = new NamedCache(taskIDString + "-" + underlyingStoreName, metrics);
     }
 
     @Test
@@ -83,18 +83,15 @@ public class NamedCacheTest {
         metricTags.put("task-id", taskIDString);
         metricTags.put("client-id", "test");
 
-        assertNotNull(streamMetrics.registry().getSensor("hitRatio"));
-        final Map<MetricName, KafkaMetric> metrics1 = streamMetrics.registry().metrics();
-        getMetricByNameFilterByTags(metrics1, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
-        getMetricByNameFilterByTags(metrics1, "hitRatio-min", "stream-record-cache-metrics", metricTags);
-        getMetricByNameFilterByTags(metrics1, "hitRatio-max", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
 
         // test "all"
         metricTags.put("record-cache-id", "all");
-        final Map<MetricName, KafkaMetric> metrics = streamMetrics.registry().metrics();
-        getMetricByNameFilterByTags(metrics, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
-        getMetricByNameFilterByTags(metrics, "hitRatio-min", "stream-record-cache-metrics", metricTags);
-        getMetricByNameFilterByTags(metrics, "hitRatio-max", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 098c326..b25b8cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -53,7 +53,6 @@ public class RocksDBKeyValueStoreSupplierTest {
 
     @After
     public void close() {
-        context.close();
         store.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 388a2fc..bd2fa91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -82,7 +82,6 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @After
     public void close() {
-        context.close();
         bytesStore.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 272e0b0..c50dfba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -68,7 +68,6 @@ public class RocksDBSessionStoreSupplierTest {
 
     @After
     public void close() {
-        context.close();
         store.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 6495315..bcb411b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -69,7 +69,6 @@ public class RocksDBSessionStoreTest {
 
     @After
     public void close() {
-        context.close();
         sessionStore.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a09d87d..b7a9d37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -79,7 +79,6 @@ public class RocksDBStoreTest {
     @After
     public void tearDown() {
         rocksDBStore.close();
-        context.close();
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index a6ccfdf..7409a13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -53,7 +53,6 @@ public class RocksDBWindowStoreSupplierTest {
 
     @After
     public void close() {
-        context.close();
         if (store != null) {
             store.close();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index bf556ad..92edbd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -126,7 +126,6 @@ public class RocksDBWindowStoreTest {
 
     @After
     public void closeStore() {
-        context.close();
         if (windowStore != null) {
             windowStore.close();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index d61218e..7a7b266 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -76,7 +76,6 @@ public class SegmentIteratorTest {
         }
         segmentOne.close();
         segmentTwo.close();
-        context.close();
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index ec59a00..bfa317d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -65,7 +65,6 @@ public class SegmentsTest {
 
     @After
     public void close() {
-        context.close();
         segments.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 21b5c5c..6bacd91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
-import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -68,11 +67,6 @@ public class StoreChangeLoggerTest {
 
     private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
 
-    @After
-    public void after() {
-        context.close();
-    }
-
     @Test
     public void testAddRemove() {
         context.setTime(1);
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 57e3efb..5e61910 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -51,7 +51,6 @@ import java.util.Map;
 public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final File stateDir;
-    private final Metrics metrics;
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
     private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
@@ -135,7 +134,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
-        this.metrics = metrics.registry();
         this.recordCollectorSupplier = collectorSupplier;
     }
 
@@ -306,10 +304,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         restoreListener.onRestoreEnd(null, storeName, 0L);
     }
 
-    public void close() {
-        metrics.close();
-    }
-
     private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback) {
         if (restoreCallback instanceof StateRestoreListener) {
             return (StateRestoreListener) restoreCallback;
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7313414..3daf051 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -147,7 +147,12 @@ public class KStreamTestDriver extends ExternalResource {
 
     private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
         for (final StateStore store : stores) {
-            store.init(context, store);
+            try {
+                store.init(context, store);
+            } catch (final RuntimeException e) {
+                new RuntimeException("Fatal exception initializing store.", e).printStackTrace();
+                throw e;
+            }
         }
 
         for (final ProcessorNode node : topology.processors()) {
@@ -230,7 +235,6 @@ public class KStreamTestDriver extends ExternalResource {
         }
 
         closeState();
-        context.close();
     }
 
     public Set<String> allProcessorNames() {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message