kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6819: Pt. 1 - Refactor thread-level Streams metrics (#6631)
Date Mon, 03 Jun 2019 21:31:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17712b9  KAFKA-6819: Pt. 1 - Refactor thread-level Streams metrics (#6631)
17712b9 is described below

commit 17712b96c86d6c2cce695531d7e5f2e3d5718b21
Author: cadonna <bruno@confluent.io>
AuthorDate: Mon Jun 3 23:31:19 2019 +0200

    KAFKA-6819: Pt. 1 - Refactor thread-level Streams metrics (#6631)
    
    * StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create
    and register sensors and their corresponding metrics. An example for such logic can be found in
    threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the
    different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per
    level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per
    application instance.
    * ThreadMetrics contains only static methods that specify all built-in thread-level sensors and
    metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor().
    * From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics.
    * ThreadsMetrics does not inherit from StreamsMetricsImpl anymore.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 build.gradle                                       |   2 +
 .../kstream/internals/KStreamAggregate.java        |   6 +-
 .../kstream/internals/KStreamKStreamJoin.java      |   7 +-
 .../internals/KStreamKTableJoinProcessor.java      |   7 +-
 .../streams/kstream/internals/KStreamReduce.java   |   6 +-
 .../internals/KStreamSessionWindowAggregate.java   |   5 +-
 .../kstream/internals/KStreamWindowAggregate.java  |   9 +-
 .../kstream/internals/KTableKTableInnerJoin.java   |   6 +-
 .../kstream/internals/KTableKTableLeftJoin.java    |   6 +-
 .../kstream/internals/KTableKTableOuterJoin.java   |   6 +-
 .../kstream/internals/KTableKTableRightJoin.java   |   6 +-
 .../streams/kstream/internals/KTableSource.java    |   6 +-
 .../processor/internals/GlobalStateUpdateTask.java |   3 +-
 .../streams/processor/internals/RecordQueue.java   |  16 +-
 .../streams/processor/internals/StreamTask.java    |  44 ++--
 .../streams/processor/internals/StreamThread.java  | 110 ++++------
 .../internals/metrics/StreamsMetricsImpl.java      | 112 +++++++---
 .../processor/internals/metrics/ThreadMetrics.java | 179 +++++++++++++++
 .../integration/MetricsIntegrationTest.java        |   6 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   3 +
 .../processor/internals/StreamTaskTest.java        |  20 +-
 .../processor/internals/StreamThreadTest.java      | 108 +++++----
 .../internals/metrics/StreamsMetricsImplTest.java  |  39 +++-
 .../internals/metrics/ThreadMetricsTest.java       | 244 +++++++++++++++++++++
 .../StreamThreadStateStoreProviderTest.java        |   3 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  25 ++-
 .../streams/processor/MockProcessorContext.java    |   8 +-
 27 files changed, 773 insertions(+), 219 deletions(-)

diff --git a/build.gradle b/build.gradle
index 07200a8..516a06c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1120,6 +1120,8 @@ project(':streams') {
     testCompile libs.log4j
     testCompile libs.junit
     testCompile libs.easymock
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
     testCompile libs.bcpkix
     testCompile libs.hamcrest
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 8fd4f5b..ca7266f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -57,6 +59,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
         private TimestampedKeyValueStore<K, T> store;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
         private TimestampedTupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
@@ -64,6 +67,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store,
@@ -80,7 +84,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
                     "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     key, value, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 17d7837..97debbc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -24,12 +25,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
     private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
@@ -57,12 +58,14 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
 
             otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
         }
@@ -81,7 +84,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
                     "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     key, value, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index dcf0799..92fd4d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +36,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
     private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
     private final boolean leftJoin;
     private StreamsMetricsImpl metrics;
+    private Sensor skippedRecordsSensor;
 
     KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
                                final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
@@ -49,6 +52,8 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
     public void init(final ProcessorContext context) {
         super.init(context);
         metrics = (StreamsMetricsImpl) context.metrics();
+        skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
+
         valueGetter.init(context);
     }
 
@@ -67,7 +72,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
-            metrics.skippedRecordsSensor().record();
+            skippedRecordsSensor.record();
         } else {
             final K2 mappedKey = keyMapper.apply(key, value);
             final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 58769df..b01ff95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -56,12 +58,14 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         private TimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 store,
@@ -78,7 +82,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
                     "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     key, value, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index fdbf475..cb117b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -83,6 +84,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private Sensor skippedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
@@ -92,6 +94,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
             internalProcessorContext = (InternalProcessorContext) context;
             metrics = (StreamsMetricsImpl) context.metrics();
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
 
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
@@ -106,7 +109,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                     "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     value, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 3458ca0..2983a3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -79,6 +80,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private Sensor skippedRecordsSensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
@@ -86,8 +88,11 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         public void init(final ProcessorContext context) {
             super.init(context);
             internalProcessorContext = (InternalProcessorContext) context;
-            metrics = (StreamsMetricsImpl) context.metrics();
+
+            metrics = internalProcessorContext.metrics();
+
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TimestampedTupleForwarder<>(
                 windowStore,
@@ -103,7 +108,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                     "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     value, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 2d4cbc8..005ea80 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +68,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -75,6 +78,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             valueGetter.init(context);
         }
 
@@ -86,7 +90,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
                     "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     change, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 03e59d9..4bd6af9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +67,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -74,6 +77,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             valueGetter.init(context);
         }
 
@@ -85,7 +89,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
                     "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     change, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index d600718..794f212 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +66,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -73,6 +76,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             valueGetter.init(context);
         }
 
@@ -84,7 +88,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
                     "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     change, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index fbddd9b..981b5ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         KTableKTableRightJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -72,6 +75,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             valueGetter.init(context);
         }
 
@@ -83,7 +87,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
                     "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
                     change, context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 864cf51..bee89b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -70,12 +72,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         private TimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private Sensor skippedRecordsSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
+            skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
             if (queryableName != null) {
                 store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
                 tupleForwarder = new TimestampedTupleForwarder<>(
@@ -94,7 +98,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
                     "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
                     context().topic(), context().partition(), context().offset()
                 );
-                metrics.skippedRecordsSensor().record();
+                skippedRecordsSensor.record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 29c861e..d6f60e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -69,7 +70,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
                     source,
                     deserializationExceptionHandler,
                     logContext,
-                    processorContext.metrics().skippedRecordsSensor()
+                    ThreadMetrics.skipRecordSensor(processorContext.metrics())
                 )
             );
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 86f5be3..6f3e70b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,17 +18,17 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.slf4j.Logger;
 
 import java.util.ArrayDeque;
 
-
 /**
  * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
  * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
@@ -48,6 +48,8 @@ public class RecordQueue {
 
     private StampedRecord headRecord = null;
 
+    private Sensor skipRecordsSensor;
+
     RecordQueue(final TopicPartition partition,
                 final SourceNode source,
                 final TimestampExtractor timestampExtractor,
@@ -58,13 +60,14 @@ public class RecordQueue {
         this.partition = partition;
         this.fifoQueue = new ArrayDeque<>();
         this.timestampExtractor = timestampExtractor;
-        this.recordDeserializer = new RecordDeserializer(
+        this.processorContext = processorContext;
+        skipRecordsSensor = ThreadMetrics.skipRecordSensor(processorContext.metrics());
+        recordDeserializer = new RecordDeserializer(
             source,
             deserializationExceptionHandler,
             logContext,
-            processorContext.metrics().skippedRecordsSensor()
+            skipRecordsSensor
         );
-        this.processorContext = processorContext;
         this.log = logContext.logger(RecordQueue.class);
     }
 
@@ -180,7 +183,8 @@ public class RecordQueue {
                         "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
                         deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
                 );
-                ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
+
+                skipRecordsSensor.record();
                 continue;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 80653c5..4fd57ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
@@ -78,7 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private final PunctuationQueue systemTimePunctuationQueue;
     private final ProducerSupplier producerSupplier;
 
-    private Sensor closeSensor;
+    private Sensor closeTaskSensor;
     private long idleStartTime;
     private Producer<byte[], byte[]> producer;
     private boolean commitRequested = false;
@@ -96,24 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             final String group = "stream-task-metrics";
 
             // first add the global operation metrics if not yet, with the global tags only
-            final Map<String, String> allTagMap = metrics.tagMap("task-id", "all");
-            final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG);
-            parent.add(
-                new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap),
-                new Avg()
-            );
-            parent.add(
-                new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap),
-                new Max()
-            );
-            parent.add(
-                new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap),
-                new Rate(TimeUnit.SECONDS, new Count())
-            );
-            parent.add(
-                new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap),
-                new CumulativeCount()
-            );
+            final Sensor parent = ThreadMetrics.commitOverTasksSensor(metrics);
 
             // add the operation metrics with additional tags
             final Map<String, String> tagMap = metrics.tagMap("task-id", taskName);
@@ -167,9 +151,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final StateDirectory stateDirectory,
                       final ThreadCache cache,
                       final Time time,
-                      final ProducerSupplier producerSupplier,
-                      final Sensor closeSensor) {
-        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null, closeSensor);
+                      final ProducerSupplier producerSupplier) {
+        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null);
     }
 
     public StreamTask(final TaskId id,
@@ -178,20 +161,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final Consumer<byte[], byte[]> consumer,
                       final ChangelogReader changelogReader,
                       final StreamsConfig config,
-                      final StreamsMetricsImpl metrics,
+                      final StreamsMetricsImpl streamsMetrics,
                       final StateDirectory stateDirectory,
                       final ThreadCache cache,
                       final Time time,
                       final ProducerSupplier producerSupplier,
-                      final RecordCollector recordCollector,
-                      final Sensor closeSensor) {
+                      final RecordCollector recordCollector) {
         super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
 
         this.time = time;
         this.producerSupplier = producerSupplier;
         this.producer = producerSupplier.get();
-        this.closeSensor = closeSensor;
-        this.taskMetrics = new TaskMetrics(id, metrics);
+        this.taskMetrics = new TaskMetrics(id, streamsMetrics);
+
+        closeTaskSensor = ThreadMetrics.closeTaskSensor(streamsMetrics);
 
         final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
 
@@ -200,8 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 id.toString(),
                 logContext,
                 productionExceptionHandler,
-                metrics.skippedRecordsSensor()
-            );
+                ThreadMetrics.skipRecordSensor(streamsMetrics));
         } else {
             this.recordCollector = recordCollector;
         }
@@ -220,7 +202,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
         // initialize the topology with its own context
-        final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
+        final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
         processorContext = processorContextImpl;
 
         final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
@@ -691,7 +673,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         partitionGroup.close();
         taskMetrics.removeAllSensors();
 
-        closeSensor.record();
+        closeTaskSensor.record();
 
         if (firstException != null) {
             throw firstException;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 419e181..4dc1bde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -31,9 +31,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -45,8 +42,8 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
-import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
@@ -62,7 +59,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
@@ -340,7 +336,7 @@ public class StreamThread extends Thread {
         final String applicationId;
         final InternalTopologyBuilder builder;
         final StreamsConfig config;
-        final StreamsMetricsThreadImpl streamsMetrics;
+        final StreamsMetricsImpl streamsMetrics;
         final StateDirectory stateDirectory;
         final ChangelogReader storeChangelogReader;
         final Time time;
@@ -349,7 +345,7 @@ public class StreamThread extends Thread {
 
         AbstractTaskCreator(final InternalTopologyBuilder builder,
                             final StreamsConfig config,
-                            final StreamsMetricsThreadImpl streamsMetrics,
+                            final StreamsMetricsImpl streamsMetrics,
                             final StateDirectory stateDirectory,
                             final ChangelogReader storeChangelogReader,
                             final Time time,
@@ -398,10 +394,11 @@ public class StreamThread extends Thread {
         private final KafkaClientSupplier clientSupplier;
         private final String threadClientId;
         private final Producer<byte[], byte[]> threadProducer;
+        private final Sensor createTaskSensor;
 
         TaskCreator(final InternalTopologyBuilder builder,
                     final StreamsConfig config,
-                    final StreamsMetricsThreadImpl streamsMetrics,
+                    final StreamsMetricsImpl streamsMetrics,
                     final StateDirectory stateDirectory,
                     final ChangelogReader storeChangelogReader,
                     final ThreadCache cache,
@@ -422,13 +419,14 @@ public class StreamThread extends Thread {
             this.clientSupplier = clientSupplier;
             this.threadProducer = threadProducer;
             this.threadClientId = threadClientId;
+            createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics);
         }
 
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer,
                               final TaskId taskId,
                               final Set<TopicPartition> partitions) {
-            streamsMetrics.taskCreatedSensor.record();
+            createTaskSensor.record();
 
             return new StreamTask(
                 taskId,
@@ -441,8 +439,7 @@ public class StreamThread extends Thread {
                 stateDirectory,
                 cache,
                 time,
-                () -> createProducer(taskId),
-                streamsMetrics.taskClosedSensor);
+                () -> createProducer(taskId));
         }
 
         private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -470,9 +467,11 @@ public class StreamThread extends Thread {
     }
 
     static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
+        private final Sensor createTaskSensor;
+
         StandbyTaskCreator(final InternalTopologyBuilder builder,
                            final StreamsConfig config,
-                           final StreamsMetricsThreadImpl streamsMetrics,
+                           final StreamsMetricsImpl streamsMetrics,
                            final StateDirectory stateDirectory,
                            final ChangelogReader storeChangelogReader,
                            final Time time,
@@ -485,13 +484,14 @@ public class StreamThread extends Thread {
                 storeChangelogReader,
                 time,
                 log);
+            createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics);
         }
 
         @Override
         StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
                                final TaskId taskId,
                                final Set<TopicPartition> partitions) {
-            streamsMetrics.taskCreatedSensor.record();
+            createTaskSensor.record();
 
             final ProcessorTopology topology = builder.build(taskId.topicGroupId);
 
@@ -516,47 +516,6 @@ public class StreamThread extends Thread {
         }
     }
 
-    static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
-
-        private final Sensor commitTimeSensor;
-        private final Sensor pollTimeSensor;
-        private final Sensor processTimeSensor;
-        private final Sensor punctuateTimeSensor;
-        private final Sensor taskCreatedSensor;
-        private final Sensor taskClosedSensor;
-
-        StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
-            super(metrics, threadName);
-            final String group = "stream-metrics";
-
-            commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO);
-            addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit");
-            addInvocationRateAndCount(commitTimeSensor, group, tagMap(), "commit");
-
-            pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO);
-            addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
-            // can't use addInvocationRateAndCount due to non-standard description string
-            pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-            pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount());
-
-            processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
-            addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
-            addInvocationRateAndCount(processTimeSensor, group, tagMap(), "process");
-
-            punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO);
-            addAvgMaxLatency(punctuateTimeSensor, group, tagMap(), "punctuate");
-            addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(), "punctuate");
-
-            taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO);
-            taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-            taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tagMap()), new Total());
-
-            taskClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
-            taskClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-            taskClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tagMap()), new Total());
-        }
-    }
-
     private final Time time;
     private final Logger log;
     private final String logPrefix;
@@ -566,9 +525,14 @@ public class StreamThread extends Thread {
     private final int maxPollTimeMs;
     private final String originalReset;
     private final TaskManager taskManager;
-    private final StreamsMetricsThreadImpl streamsMetrics;
     private final AtomicInteger assignmentErrorCode;
 
+    private final StreamsMetricsImpl streamsMetrics;
+    private final Sensor commitSensor;
+    private final Sensor pollSensor;
+    private final Sensor punctuateSensor;
+    private final Sensor processSensor;
+
     private long now;
     private long lastPollMs;
     private long lastCommitMs;
@@ -620,10 +584,7 @@ public class StreamThread extends Thread {
             threadProducer = clientSupplier.getProducer(producerConfigs);
         }
 
-        final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(
-            metrics,
-            threadClientId
-        );
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
@@ -697,7 +658,7 @@ public class StreamThread extends Thread {
                         final Consumer<byte[], byte[]> consumer,
                         final String originalReset,
                         final TaskManager taskManager,
-                        final StreamsMetricsThreadImpl streamsMetrics,
+                        final StreamsMetricsImpl streamsMetrics,
                         final InternalTopologyBuilder builder,
                         final String threadClientId,
                         final LogContext logContext,
@@ -707,9 +668,24 @@ public class StreamThread extends Thread {
         this.stateLock = new Object();
         this.standbyRecords = new HashMap<>();
 
+        this.streamsMetrics = streamsMetrics;
+        this.commitSensor = ThreadMetrics.commitSensor(streamsMetrics);
+        this.pollSensor = ThreadMetrics.pollSensor(streamsMetrics);
+        this.processSensor = ThreadMetrics.processSensor(streamsMetrics);
+        this.punctuateSensor = ThreadMetrics.punctuateSensor(streamsMetrics);
+
+        // The following sensors are created here but their references are not stored in this object, since within
+        // this object they are not recorded. The sensors are created here so that the stream threads starts with all
+        // its metrics initialised. Otherwise, those sensors would have been created during processing, which could
+        // lead to missing metrics. For instance, if no task were created, the metrics for created and closed
+        // tasks would never be added to the metrics.
+        ThreadMetrics.createTaskSensor(streamsMetrics);
+        ThreadMetrics.closeTaskSensor(streamsMetrics);
+        ThreadMetrics.skipRecordSensor(streamsMetrics);
+        ThreadMetrics.commitOverTasksSensor(streamsMetrics);
+
         this.time = time;
         this.builder = builder;
-        this.streamsMetrics = streamsMetrics;
         this.logPrefix = logContext.logPrefix();
         this.log = logContext.logger(StreamThread.class);
         this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
@@ -857,7 +833,7 @@ public class StreamThread extends Thread {
         final long pollLatency = advanceNowAndComputeLatency();
 
         if (records != null && !records.isEmpty()) {
-            streamsMetrics.pollTimeSensor.record(pollLatency, now);
+            pollSensor.record(pollLatency, now);
             addRecordsToTasks(records);
         }
 
@@ -891,14 +867,14 @@ public class StreamThread extends Thread {
 
                     if (processed > 0) {
                         final long processLatency = advanceNowAndComputeLatency();
-                        streamsMetrics.processTimeSensor.record(processLatency / (double) processed, now);
+                        processSensor.record(processLatency / (double) processed, now);
 
                         // commit any tasks that have requested a commit
                         final int committed = taskManager.maybeCommitActiveTasksPerUserRequested();
 
                         if (committed > 0) {
                             final long commitLatency = advanceNowAndComputeLatency();
-                            streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now);
+                            commitSensor.record(commitLatency / (double) committed, now);
                         }
                     } else {
                         // if there is no records to be processed, exit immediately
@@ -1031,7 +1007,7 @@ public class StreamThread extends Thread {
         final int punctuated = taskManager.punctuate();
         if (punctuated > 0) {
             final long punctuateLatency = advanceNowAndComputeLatency();
-            streamsMetrics.punctuateTimeSensor.record(punctuateLatency / (double) punctuated, now);
+            punctuateSensor.record(punctuateLatency / (double) punctuated, now);
         }
 
         return punctuated > 0;
@@ -1057,7 +1033,7 @@ public class StreamThread extends Thread {
             committed += taskManager.commitAll();
             if (committed > 0) {
                 final long intervalCommitLatency = advanceNowAndComputeLatency();
-                streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now);
+                commitSensor.record(intervalCommitLatency / (double) committed, now);
 
                 // try to purge the committed records for repartition topics if possible
                 taskManager.maybePurgeCommitedRecords();
@@ -1074,7 +1050,7 @@ public class StreamThread extends Thread {
             final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested();
             if (commitPerRequested > 0) {
                 final long requestCommitLatency = advanceNowAndComputeLatency();
-                streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now);
+                commitSensor.record(requestCommitLatency / (double) committed, now);
                 committed += commitPerRequested;
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 0a47fce..46b5669 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -20,11 +20,11 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.util.Arrays;
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 public class StreamsMetricsImpl implements StreamsMetrics {
     private final Metrics metrics;
     private final Map<Sensor, Sensor> parentSensors;
-    private final Sensor skippedRecordsSensor;
     private final String threadName;
 
     private final Deque<String> threadLevelSensors = new LinkedList<>();
@@ -52,6 +51,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private static final String SENSOR_PREFIX_DELIMITER = ".";
     private static final String SENSOR_NAME_DELIMITER = ".s.";
 
+    public static final String THREAD_ID_TAG = "client-id";
+    public static final String TASK_ID_TAG = "task-id";
+
+    public static final String ALL_TASKS = "all";
+
+    public static final String LATENCY_SUFFIX = "-latency";
+    public static final String AVG_SUFFIX = "-avg";
+    public static final String MAX_SUFFIX = "-max";
+    public static final String RATE_SUFFIX = "-rate";
+    public static final String TOTAL_SUFFIX = "-total";
+
+    public static final String THREAD_LEVEL_GROUP = "stream-metrics";
+    public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
+
     public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
     public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
 
@@ -60,30 +73,47 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
-        this.threadName = threadName;
-
         this.metrics = metrics;
+        this.threadName = threadName;
 
         this.parentSensors = new HashMap<>();
-
-        final String group = "stream-metrics";
-        skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
-        skippedRecordsSensor.add(new MetricName("skipped-records-rate", group, "The average per-second number of skipped records", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
-        skippedRecordsSensor.add(new MetricName("skipped-records-total", group, "The total number of skipped records", tagMap()), new Total());
     }
 
     public final Sensor threadLevelSensor(final String sensorName,
-                                          final Sensor.RecordingLevel recordingLevel,
+                                          final RecordingLevel recordingLevel,
                                           final Sensor... parents) {
         synchronized (threadLevelSensors) {
             final String fullSensorName = threadSensorPrefix() + SENSOR_NAME_DELIMITER + sensorName;
             final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
             threadLevelSensors.push(fullSensorName);
-
             return sensor;
         }
     }
 
+    private String threadSensorPrefix() {
+        return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
+    }
+
+    public Map<String, String> threadLevelTagMap() {
+        final Map<String, String> tagMap = new LinkedHashMap<>();
+        tagMap.put(THREAD_ID_TAG, threadName);
+        return tagMap;
+    }
+
+    public Map<String, String> threadLevelTagMap(final String... tags) {
+        final Map<String, String> tagMap = threadLevelTagMap();
+        if (tags != null) {
+            if ((tags.length % 2) != 0) {
+                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
+            }
+
+            for (int i = 0; i < tags.length; i += 2) {
+                tagMap.put(tags[i], tags[i + 1]);
+            }
+        }
+        return tagMap;
+    }
+
     public final void removeAllThreadLevelSensors() {
         synchronized (threadLevelSensors) {
             while (!threadLevelSensors.isEmpty()) {
@@ -92,13 +122,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
-    private String threadSensorPrefix() {
-        return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
-    }
-
     public final Sensor taskLevelSensor(final String taskName,
                                         final String sensorName,
-                                        final Sensor.RecordingLevel recordingLevel,
+                                        final RecordingLevel recordingLevel,
                                         final Sensor... parents) {
         final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
@@ -235,10 +261,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store" + SENSOR_PREFIX_DELIMITER + storeName;
     }
 
-    public final Sensor skippedRecordsSensor() {
-        return skippedRecordsSensor;
-    }
-
     @Override
     public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) {
         return metrics.sensor(name, recordingLevel);
@@ -357,6 +379,28 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
 
+    public static void addAvgAndMax(final Sensor sensor,
+                                    final String group,
+                                    final Map<String, String> tags,
+                                    final String operation) {
+        sensor.add(
+            new MetricName(
+                operation + AVG_SUFFIX,
+                group,
+                "The average value of " + operation + ".",
+                tags),
+            new Avg()
+        );
+        sensor.add(
+            new MetricName(
+                operation + MAX_SUFFIX,
+                group,
+                "The max value of " + operation + ".",
+                tags),
+            new Max()
+        );
+    }
+
     public static void addAvgMaxLatency(final Sensor sensor,
                                         final String group,
                                         final Map<String, String> tags,
@@ -382,27 +426,41 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public static void addInvocationRateAndCount(final Sensor sensor,
                                                  final String group,
                                                  final Map<String, String> tags,
-                                                 final String operation) {
+                                                 final String operation,
+                                                 final String descriptionOfInvocation,
+                                                 final String descriptionOfRate) {
         sensor.add(
             new MetricName(
-                operation + "-rate",
+                operation + TOTAL_SUFFIX,
                 group,
-                "The average number of occurrence of " + operation + " operation per second.",
+                descriptionOfInvocation,
                 tags
             ),
-            new Rate(TimeUnit.SECONDS, new Count())
+            new CumulativeCount()
         );
         sensor.add(
             new MetricName(
-                operation + "-total",
+                operation + RATE_SUFFIX,
                 group,
-                "The total number of occurrence of " + operation + " operations.",
+                descriptionOfRate,
                 tags
             ),
-            new CumulativeCount()
+            new Rate(TimeUnit.SECONDS, new Count())
         );
     }
 
+    public static void addInvocationRateAndCount(final Sensor sensor,
+                                                 final String group,
+                                                 final Map<String, String> tags,
+                                                 final String operation) {
+        addInvocationRateAndCount(sensor,
+                                  group,
+                                  tags,
+                                  operation,
+                                  "The total number of " + operation,
+                                  "The average per-second number of " + operation);
+    }
+
     /**
      * Deletes a sensor and its parents, if any
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
new file mode 100644
index 0000000..e177667
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+
+import java.util.Map;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
+public class ThreadMetrics {
+    private ThreadMetrics() {}
+
+    private static final String COMMIT = "commit";
+    private static final String POLL = "poll";
+    private static final String PROCESS = "process";
+    private static final String PUNCTUATE = "punctuate";
+    private static final String CREATE_TASK = "task-created";
+    private static final String CLOSE_TASK = "task-closed";
+    private static final String SKIP_RECORD = "skipped-records";
+
+    private static final String TOTAL_DESCRIPTION = "The total number of ";
+    private static final String RATE_DESCRIPTION = "The average per-second number of ";
+    private static final String COMMIT_DESCRIPTION = "commit calls";
+    private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
+    private static final String COMMIT_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_DESCRIPTION;
+    private static final String CREATE_TASK_DESCRIPTION = "newly created tasks";
+    private static final String CREATE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CREATE_TASK_DESCRIPTION;
+    private static final String CREATE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CREATE_TASK_DESCRIPTION;
+    private static final String CLOSE_TASK_DESCRIPTION = "closed tasks";
+    private static final String CLOSE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CLOSE_TASK_DESCRIPTION;
+    private static final String CLOSE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CLOSE_TASK_DESCRIPTION;
+    private static final String POLL_DESCRIPTION = "poll calls";
+    private static final String POLL_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + POLL_DESCRIPTION;
+    private static final String POLL_RATE_DESCRIPTION = RATE_DESCRIPTION + POLL_DESCRIPTION;
+    private static final String PROCESS_DESCRIPTION = "process calls";
+    private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION;
+    private static final String PROCESS_RATE_DESCRIPTION = RATE_DESCRIPTION + PROCESS_DESCRIPTION;
+    private static final String PUNCTUATE_DESCRIPTION = "punctuate calls";
+    private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION;
+    private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION;
+    private static final String SKIP_RECORDS_DESCRIPTION = "skipped records";
+    private static final String SKIP_RECORD_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + SKIP_RECORDS_DESCRIPTION;
+    private static final String SKIP_RECORD_RATE_DESCRIPTION = RATE_DESCRIPTION + SKIP_RECORDS_DESCRIPTION;
+    private static final String COMMIT_OVER_TASKS_DESCRIPTION = "commit calls over all tasks";
+    private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
+    private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
+
+    private static final String COMMIT_LATENCY = COMMIT + LATENCY_SUFFIX;
+    private static final String POLL_LATENCY = POLL + LATENCY_SUFFIX;
+    private static final String PROCESS_LATENCY = PROCESS + LATENCY_SUFFIX;
+    private static final String PUNCTUATE_LATENCY = PUNCTUATE + LATENCY_SUFFIX;
+
+    public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO);
+        addInvocationRateAndCount(createTaskSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  streamsMetrics.threadLevelTagMap(),
+                                  CREATE_TASK,
+                                  CREATE_TASK_TOTAL_DESCRIPTION,
+                                  CREATE_TASK_RATE_DESCRIPTION);
+        return createTaskSensor;
+    }
+
+    public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO);
+        addInvocationRateAndCount(closeTaskSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  streamsMetrics.threadLevelTagMap(),
+                                  CLOSE_TASK,
+                                  CLOSE_TASK_TOTAL_DESCRIPTION,
+                                  CLOSE_TASK_RATE_DESCRIPTION);
+        return closeTaskSensor;
+    }
+
+    public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO);
+        final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
+        addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
+        addInvocationRateAndCount(commitSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  tagMap,
+                                  COMMIT,
+                                  COMMIT_TOTAL_DESCRIPTION,
+                                  COMMIT_RATE_DESCRIPTION);
+        return commitSensor;
+    }
+
+    public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO);
+        final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
+        addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
+        addInvocationRateAndCount(pollSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  tagMap,
+                                  POLL,
+                                  POLL_TOTAL_DESCRIPTION,
+                                  POLL_RATE_DESCRIPTION);
+        return pollSensor;
+    }
+
+    public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO);
+        final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
+        addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
+        addInvocationRateAndCount(processSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  tagMap,
+                                  PROCESS,
+                                  PROCESS_TOTAL_DESCRIPTION,
+                                  PROCESS_RATE_DESCRIPTION);
+
+        return processSensor;
+    }
+
+    public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO);
+        final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
+        addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
+        addInvocationRateAndCount(punctuateSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  tagMap,
+                                  PUNCTUATE,
+                                  PUNCTUATE_TOTAL_DESCRIPTION,
+                                  PUNCTUATE_RATE_DESCRIPTION);
+
+        return punctuateSensor;
+    }
+
+    public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO);
+        addInvocationRateAndCount(skippedRecordsSensor,
+                                  THREAD_LEVEL_GROUP,
+                                  streamsMetrics.threadLevelTagMap(),
+                                  SKIP_RECORD,
+                                  SKIP_RECORD_TOTAL_DESCRIPTION,
+                                  SKIP_RECORD_RATE_DESCRIPTION);
+
+        return skippedRecordsSensor;
+    }
+
+    public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) {
+        final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG);
+        final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS);
+        addAvgAndMax(commitOverTasksSensor,
+                     TASK_LEVEL_GROUP,
+                     tagMap,
+                     COMMIT_LATENCY);
+        addInvocationRateAndCount(commitOverTasksSensor,
+                                  TASK_LEVEL_GROUP,
+                                  tagMap,
+                                  COMMIT,
+                                  COMMIT_OVER_TASKS_TOTAL_DESCRIPTION,
+                                  COMMIT_OVER_TASKS_RATE_DESCRIPTION);
+
+        return commitOverTasksSensor;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 3f6c806..bc4d895 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.SessionStore;
@@ -37,9 +37,9 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0dfd8ad..5b20737 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -92,6 +93,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
         final File stateDir = TestUtils.tempDirectory();
         metrics = new Metrics();
         final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics);
+        ThreadMetrics.skipRecordSensor(metrics);
+
         context = new InternalMockProcessorContext(
             stateDir,
             Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ccd94de..ab9a47e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -246,7 +246,6 @@ public class StreamTaskTest {
                         throw new TimeoutException("test");
                     }
                 },
-                null,
                 null
             );
             fail("Expected an exception");
@@ -301,7 +300,6 @@ public class StreamTaskTest {
                     }
                 }
             },
-            null,
             null
         );
         testTask.initializeTopology();
@@ -851,8 +849,7 @@ public class StreamTaskTest {
                 public void flush() {
                     flushed.set(true);
                 }
-            },
-            metrics.sensor("dummy"));
+            });
         streamTask.flushState();
         assertTrue(flushed.get());
     }
@@ -1427,8 +1424,7 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
-            metrics.sensor("dummy"));
+            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
         task.initializeStateStores();
         task.initializeTopology();
 
@@ -1498,8 +1494,7 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
-            metrics.sensor("dummy"));
+            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
     }
 
     private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
@@ -1520,8 +1515,7 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
-            metrics.sensor("dummy"));
+            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
     }
 
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
@@ -1546,8 +1540,7 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
-            metrics.sensor("dummy"));
+            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
     }
 
     // this task will throw exception when processing (on partition2), flushing, suspending and closing
@@ -1573,8 +1566,7 @@ public class StreamTaskTest {
             stateDirectory,
             null,
             time,
-            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
-            metrics.sensor("dummy")) {
+            () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)) {
             @Override
             protected void flushState() {
                 throw new RuntimeException("KABOOM!");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1de39d2..7c17ca7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -254,35 +255,70 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(clientId, config, false);
         final String defaultGroupName = "stream-metrics";
         final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName());
-
-        assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("commit-total", defaultGroupName, "The total number of commit calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-avg", defaultGroupName, "The average poll time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-max", defaultGroupName, "The maximum poll time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("poll-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("poll-total", defaultGroupName, "The total number of record-poll calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-avg", defaultGroupName, "The average process time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-max", defaultGroupName, "The maximum process time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("process-rate", defaultGroupName, "The average per-second number of process calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("process-total", defaultGroupName, "The total number of process calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-avg", defaultGroupName, "The average punctuate time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-total", defaultGroupName, "The total number of punctuate calls", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("task-created-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("task-created-total", defaultGroupName, "The total number of newly created tasks", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-rate", defaultGroupName, "The average per-second number of closed tasks", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-total", defaultGroupName, "The total number of closed tasks", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName, "The average per-second number of skipped records.", defaultTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-total", defaultGroupName, "The total number of skipped records.", defaultTags)));
+        final String descriptionIsNotVerified = "";
+
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "poll-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "poll-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "poll-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "poll-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "process-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "process-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "process-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "process-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "punctuate-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "punctuate-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "punctuate-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "punctuate-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "task-created-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "task-created-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "task-closed-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "task-closed-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "skipped-records-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "skipped-records-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
+
+        final String taskGroupName = "stream-task-metrics";
+        final Map<String, String> taskTags =
+            mkMap(mkEntry("task-id", "all"), mkEntry("client-id", thread.getName()));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-latency-avg", taskGroupName, descriptionIsNotVerified, taskTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-latency-max", taskGroupName, descriptionIsNotVerified, taskTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(
+            "commit-rate", taskGroupName, descriptionIsNotVerified, taskTags)));
 
         final JmxReporter reporter = new JmxReporter("kafka.streams");
         metrics.addReporter(reporter);
         assertEquals(clientId + "-StreamThread-1", thread.getName());
         assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
-                defaultGroupName, thread.getName())));
+                   defaultGroupName, 
+                   thread.getName())));
+        assertTrue(reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=" + thread.getName() + ",task-id=all"));
     }
 
     @Test
@@ -296,8 +332,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -423,8 +458,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -459,8 +493,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -610,8 +643,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -644,8 +676,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -672,8 +703,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -1449,8 +1479,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
                 mockTime,
                 config,
@@ -1489,8 +1518,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
-            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
         final StreamThread thread = new StreamThread(
                 mockTime,
                 config,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index e1d6f0c..f381a58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -22,7 +22,10 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.utils.MockTime;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -38,8 +41,34 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
-public class StreamsMetricsImplTest {
+public class StreamsMetricsImplTest extends EasyMockSupport {
+
+    private final static String SENSOR_PREFIX_DELIMITER = ".";
+    private final static String SENSOR_NAME_DELIMITER = ".s.";
+    private final static String INTERNAL_PREFIX = "internal";
+
+    @Test
+    public void shouldGetThreadLevelSensor() {
+        final Metrics metrics = mock(Metrics.class);
+        final String threadName = "thread1";
+        final String sensorName = "sensor1";
+        final String expectedFullSensorName =
+            INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName;
+        final RecordingLevel recordingLevel = RecordingLevel.DEBUG;
+        final Sensor[] parents = {};
+        EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null);
+
+        replayAll();
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
+        final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel);
+
+        verifyAll();
+
+        assertNull(sensor);
+    }
 
     @Test(expected = NullPointerException.class)
     public void testNullMetrics() {
@@ -92,13 +121,13 @@ public class StreamsMetricsImplTest {
 
         final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
         addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
-        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
 
         final int numberOfTaskMetrics = registry.metrics().size();
 
         final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
         addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
-        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
 
         assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
 
@@ -108,13 +137,13 @@ public class StreamsMetricsImplTest {
 
         final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
         addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
-        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
 
         assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
 
         final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
         addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
-        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
 
         assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
new file mode 100644
index 0000000..89395d9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verify;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StreamsMetricsImpl.class)
+public class ThreadMetricsTest {
+
+    private static final String THREAD_LEVEL_GROUP = "stream-metrics";
+    private static final String TASK_LEVEL_GROUP = "stream-task-metrics";
+
+    private final Metrics dummyMetrics = new Metrics();
+    private final Sensor dummySensor = dummyMetrics.sensor("dummy");
+    private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class);
+    private final Map<String, String> dummyTagMap = Collections.singletonMap("hello", "world");
+
+    @Test
+    public void shouldGetCreateTaskSensor() {
+        final String operation = "task-created";
+        final String totalDescription = "The total number of newly created tasks";
+        final String rateDescription = "The average per-second number of newly created tasks";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.createTaskSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetCloseTaskSensor() {
+        final String operation = "task-closed";
+        final String totalDescription = "The total number of closed tasks";
+        final String rateDescription = "The average per-second number of closed tasks";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.closeTaskSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetCommitSensor() {
+        final String operation = "commit";
+        final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
+        final String totalDescription = "The total number of commit calls";
+        final String rateDescription = "The average per-second number of commit calls";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+        StreamsMetricsImpl.addAvgAndMax(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.commitSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetPollSensor() {
+        final String operation = "poll";
+        final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
+        final String totalDescription = "The total number of poll calls";
+        final String rateDescription = "The average per-second number of poll calls";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+        StreamsMetricsImpl.addAvgAndMax(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.pollSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetProcessSensor() {
+        final String operation = "process";
+        final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
+        final String totalDescription = "The total number of process calls";
+        final String rateDescription = "The average per-second number of process calls";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+        StreamsMetricsImpl.addAvgAndMax(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.processSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetPunctuateSensor() {
+        final String operation = "punctuate";
+        final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
+        final String totalDescription = "The total number of punctuate calls";
+        final String rateDescription = "The average per-second number of punctuate calls";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+        StreamsMetricsImpl.addAvgAndMax(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.punctuateSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetSkipRecordSensor() {
+        final String operation = "skipped-records";
+        final String totalDescription = "The total number of skipped records";
+        final String rateDescription = "The average per-second number of skipped records";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.skipRecordSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+
+    @Test
+    public void shouldGetCommitOverTasksSensor() {
+        final String operation = "commit";
+        final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
+        final String totalDescription = "The total number of commit calls over all tasks";
+        final String rateDescription = "The average per-second number of commit calls over all tasks";
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor);
+        expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap);
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
+        StreamsMetricsImpl.addAvgAndMax(
+            dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency);
+
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor = ThreadMetrics.commitOverTasksSensor(streamsMetrics);
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(dummySensor));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index da2d46d..f48c31c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -317,8 +317,7 @@ public class StreamThreadStateStoreProviderTest {
             stateDirectory,
             null,
             new MockTime(),
-            () -> clientSupplier.getProducer(new HashMap<>()),
-            metrics.sensor("dummy")) {
+            () -> clientSupplier.getProducer(new HashMap<>())) {
             @Override
             protected void updateOffsetLimits() {}
         };
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 38da0d8..48c038c 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -32,6 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -277,10 +280,21 @@ public class TopologyTestDriver implements Closeable {
             .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
 
         metrics = new Metrics(metricConfig, mockWallClockTime);
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
-            metrics,
-            "topology-test-driver-virtual-thread"
-        );
+
+        final String threadName = "topology-test-driver-virtual-thread";
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
+        final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
+        final String threadLevelGroup = "stream-metrics";
+        skippedRecordsSensor.add(new MetricName("skipped-records-rate",
+                                                threadLevelGroup,
+                                                "The average per-second number of skipped records",
+                                                streamsMetrics.tagMap()),
+                                 new Rate(TimeUnit.SECONDS, new Count()));
+        skippedRecordsSensor.add(new MetricName("skipped-records-total",
+                                                threadLevelGroup,
+                                                "The total number of skipped records",
+                                                streamsMetrics.tagMap()),
+                                 new Total());
         final ThreadCache cache = new ThreadCache(
             new LogContext("topology-test-driver "),
             Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
@@ -360,8 +374,7 @@ public class TopologyTestDriver implements Closeable {
                 stateDirectory,
                 cache,
                 mockWallClockTime,
-                () -> producer,
-                metrics.sensor("dummy"));
+                () -> producer);
             task.initializeStateStores();
             task.initializeTopology();
             ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 34a7ed9..ed02746 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
@@ -214,10 +215,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         this.stateDir = stateDir;
         final MetricConfig metricConfig = new MetricConfig();
         metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
-        this.metrics = new StreamsMetricsImpl(
-            new Metrics(metricConfig),
-            "mock-processor-context-virtual-thread"
-        );
+        final String threadName = "mock-processor-context-virtual-thread";
+        this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName);
+        ThreadMetrics.skipRecordSensor(metrics);
     }
 
     @Override


Mime
View raw message