kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-3715: Add granular metrics to Kafka Streams and add hierarhical logging levels to Metrics
Date Wed, 11 Jan 2017 20:06:48 GMT
KAFKA-3715: Add granular metrics to Kafka Streams and add hierarhical logging levels to Metrics

Kafka Streams: add granular metrics per node and per task, also expose ability to register non latency metrics in StreamsMetrics
Also added different recording levels to Metrics.

This is joint contribution from Eno Thereska and Aarti Gupta.

from https://github.com/apache/kafka/pull/1362#issuecomment-218326690-------
We can consider adding metrics for process / punctuate / commit rate at the granularity of each processor node in addition to the global rate mentioned above. This is very helpful in debugging.

We can consider adding rate / total cumulated metrics for context.forward indicating how many records were forwarded downstream from this processor node as well. This is helpful in debugging.

We can consider adding metrics for each stream partition timestamp.
This is helpful in debugging.
## Besides the latency metrics, we can also add throughput latency in terms of source records consumed.

More discussions here https://issues.apache.org/jira/browse/KAFKA-3715, KIP-104, KIP-105

Author: Eno Thereska <eno@confluent.io>
Author: Aarti Gupta <aartiguptaa@gmail.com>

Reviewers: Greg Fodor, Ismael Juma, Damian Guy, Guozhang Wang

Closes #1446 from aartigupta/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecff8544
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecff8544
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecff8544

Branch: refs/heads/trunk
Commit: ecff8544dd45e8cf0fcf04f5e0e716d3e21c9f20
Parents: 3d60f1e
Author: Eno Thereska <eno@confluent.io>
Authored: Wed Jan 11 11:34:58 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 11 12:06:18 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/CommonClientConfigs.java      |   3 +
 .../kafka/clients/consumer/ConsumerConfig.java  |  12 ++
 .../kafka/common/metrics/MetricConfig.java      |  13 ++
 .../apache/kafka/common/metrics/Metrics.java    |  80 +++++--
 .../org/apache/kafka/common/metrics/Sensor.java | 102 +++++++--
 .../apache/kafka/common/metrics/SensorTest.java |  58 +++++
 .../main/scala/kafka/server/KafkaConfig.scala   |   7 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 .../org/apache/kafka/streams/KafkaStreams.java  |  13 ++
 .../org/apache/kafka/streams/StreamsConfig.java |  11 +
 .../apache/kafka/streams/StreamsMetrics.java    |  72 ++++++-
 .../processor/internals/ProcessorNode.java      | 111 +++++++++-
 .../streams/processor/internals/SinkNode.java   |   7 +-
 .../streams/processor/internals/SourceNode.java |   7 +-
 .../processor/internals/StandbyContextImpl.java |   4 +-
 .../streams/processor/internals/StreamTask.java |  58 +++--
 .../processor/internals/StreamThread.java       | 200 +++++++-----------
 .../processor/internals/StreamsMetricsImpl.java | 210 +++++++++++++++++++
 .../state/internals/MeteredKeyValueStore.java   | 134 +++++++-----
 .../internals/MeteredSegmentedBytesStore.java   |  12 +-
 .../state/internals/MeteredWindowStore.java     |  80 ++++---
 .../streams/state/internals/NamedCache.java     |  59 +++++-
 .../streams/state/internals/ThreadCache.java    |  29 +--
 .../state/internals/ThreadCacheMetrics.java     |  40 ----
 .../apache/kafka/streams/KafkaStreamsTest.java  |  40 ++++
 ...reamSessionWindowAggregateProcessorTest.java |   5 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |  38 +++-
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../processor/internals/MockStreamsMetrics.java |  28 +++
 .../processor/internals/ProcessorNodeTest.java  |  65 ++++++
 .../processor/internals/StandbyTaskTest.java    |  16 +-
 .../processor/internals/StreamTaskTest.java     |  54 ++++-
 .../processor/internals/StreamThreadTest.java   | 136 ++++++++----
 .../internals/StreamsMetricsImplTest.java       | 110 ++++++++++
 .../streams/state/KeyValueStoreTestDriver.java  |  57 ++---
 .../internals/CachingKeyValueStoreTest.java     |   4 +-
 .../internals/CachingSessionStoreTest.java      |   4 +-
 .../state/internals/CachingWindowStoreTest.java |   4 +-
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   4 +-
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   6 +-
 ...ergedSortedCacheWindowStoreIteratorTest.java |   4 +-
 .../MeteredSegmentedBytesStoreTest.java         |  40 +++-
 .../streams/state/internals/NamedCacheTest.java |  32 ++-
 .../RocksDBSegmentedBytesStoreTest.java         |   4 +-
 .../internals/RocksDBSessionStoreTest.java      |   4 +-
 .../state/internals/RocksDBWindowStoreTest.java |  36 ++--
 .../state/internals/SegmentIteratorTest.java    |   4 +-
 .../streams/state/internals/SegmentsTest.java   |   4 +-
 .../StreamThreadStateStoreProviderTest.java     |  23 +-
 .../state/internals/ThreadCacheTest.java        |  59 +++---
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 .../apache/kafka/test/MockProcessorContext.java |  40 ++--
 .../kafka/test/ProcessorTopologyTestDriver.java |  61 +++---
 53 files changed, 1627 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 3327815..944b09d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -63,6 +63,9 @@ public class CommonClientConfigs {
     public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
     public static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
 
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
+    public static final String METRICS_RECORDING_LEVEL_DOC = "The highest recording level for metrics.";
+
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 13cc4c4..ed809a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Collections;
@@ -175,6 +176,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
     /**
+     * <code>metrics.log.level</code>
+     */
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
+    /**
      * <code>metric.reporters</code>
      */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
@@ -327,6 +333,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(METRICS_RECORDING_LEVEL_CONFIG,
+                                        Type.STRING,
+                                        Sensor.RecordingLevel.INFO.toString(),
+                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
                                         Type.LIST,
                                         "",

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
index a423334..0766cc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -30,6 +30,7 @@ public class MetricConfig {
     private long eventWindow;
     private long timeWindowMs;
     private Map<String, String> tags;
+    private Sensor.RecordingLevel recordingLevel;
 
     public MetricConfig() {
         super();
@@ -38,6 +39,7 @@ public class MetricConfig {
         this.eventWindow = Long.MAX_VALUE;
         this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
         this.tags = new LinkedHashMap<>();
+        this.recordingLevel = Sensor.RecordingLevel.INFO;
     }
 
     public Quota quota() {
@@ -86,4 +88,15 @@ public class MetricConfig {
         this.samples = samples;
         return this;
     }
+
+    public Sensor.RecordingLevel recordLevel() {
+        return this.recordingLevel;
+    }
+
+    public MetricConfig recordLevel(Sensor.RecordingLevel recordingLevel) {
+        this.recordingLevel = recordingLevel;
+        return this;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 8beb107..7b303fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -12,6 +12,12 @@
  */
 package org.apache.kafka.common.metrics;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,12 +31,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A registry of sensors and metrics.
  * <p>
@@ -236,35 +236,74 @@ public class Metrics implements Closeable {
     }
 
     /**
-     * Get or create a sensor with the given unique name and no parent sensors.
+     * Get or create a sensor with the given unique name and no parent sensors. This uses
+     * a default recording level of INFO.
      * @param name The sensor name
      * @return The sensor
      */
     public Sensor sensor(String name) {
-        return sensor(name, null, (Sensor[]) null);
+        return this.sensor(name, Sensor.RecordingLevel.INFO);
     }
 
     /**
+     * Get or create a sensor with the given unique name and no parent sensors and with a given
+     * recording level.
+     * @param name The sensor name.
+     * @param recordingLevel The recording level.
+     * @return The sensor
+     */
+    public Sensor sensor(String name, Sensor.RecordingLevel recordingLevel) {
+        return sensor(name, null, recordingLevel, (Sensor[]) null);
+    }
+
+
+    /**
      * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
-     * receive every value recorded with this sensor.
+     * receive every value recorded with this sensor. This uses a default recording level of INFO.
      * @param name The name of the sensor
      * @param parents The parent sensors
      * @return The sensor that is created
      */
     public Sensor sensor(String name, Sensor... parents) {
-        return sensor(name, null, parents);
+        return this.sensor(name, Sensor.RecordingLevel.INFO, parents);
     }
 
     /**
      * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
      * receive every value recorded with this sensor.
+     * @param name The name of the sensor.
+     * @param parents The parent sensors.
+     * @param recordingLevel The recording level.
+     * @return The sensor that is created
+     */
+    public Sensor sensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+        return sensor(name, null, recordingLevel, parents);
+    }
+
+    /**
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor. This uses a default recording level of INFO.
      * @param name The name of the sensor
      * @param config A default configuration to use for this sensor for metrics that don't have their own config
      * @param parents The parent sensors
      * @return The sensor that is created
      */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
-        return sensor(name, config, Long.MAX_VALUE, parents);
+        return this.sensor(name, config, Sensor.RecordingLevel.INFO, parents);
+    }
+
+
+    /**
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor.
+     * @param name The name of the sensor
+     * @param config A default configuration to use for this sensor for metrics that don't have their own config
+     * @param recordingLevel The recording level.
+     * @param parents The parent sensors
+     * @return The sensor that is created
+     */
+    public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+        return sensor(name, config, Long.MAX_VALUE, recordingLevel, parents);
     }
 
     /**
@@ -275,12 +314,13 @@ public class Metrics implements Closeable {
      * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time,
      *                                        it is eligible for removal
      * @param parents The parent sensors
+     * @param recordingLevel The recording level.
      * @return The sensor that is created
      */
-    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor... parents) {
+    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
         Sensor s = getSensor(name);
         if (s == null) {
-            s = new Sensor(this, name, parents, config == null ? this.config : config, time, inactiveSensorExpirationTimeSeconds);
+            s = new Sensor(this, name, parents, config == null ? this.config : config, time, inactiveSensorExpirationTimeSeconds, recordingLevel);
             this.sensors.put(name, s);
             if (parents != null) {
                 for (Sensor parent : parents) {
@@ -298,6 +338,20 @@ public class Metrics implements Closeable {
     }
 
     /**
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor. This uses a default recording level of INFO.
+     * @param name The name of the sensor
+     * @param config A default configuration to use for this sensor for metrics that don't have their own config
+     * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time,
+     *                                        it is eligible for removal
+     * @param parents The parent sensors
+     * @return The sensor that is created
+     */
+    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor... parents) {
+        return this.sensor(name, config, inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel.INFO, parents);
+    }
+
+    /**
      * Remove a sensor (if it exists), associated metrics and its children.
      *
      * @param name The name of the sensor to be removed

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 4f630f9..4a9b488 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -12,18 +12,19 @@
  */
 package org.apache.kafka.common.metrics;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-
 /**
  * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -41,7 +42,63 @@ public final class Sensor {
     private volatile long lastRecordTime;
     private final long inactiveSensorExpirationTimeMs;
 
-    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time, long inactiveSensorExpirationTimeSeconds) {
+    public enum RecordingLevel {
+        INFO(0, "INFO"), DEBUG(1, "DEBUG");
+
+        private static final RecordingLevel[] ID_TO_TYPE;
+        private static final int MIN_RECORDING_LEVEL_KEY = 0;
+        public static final int MAX_RECORDING_LEVEL_KEY;
+
+        static {
+            int maxRL = -1;
+            for (RecordingLevel level : RecordingLevel.values()) {
+                maxRL = Math.max(maxRL, level.id);
+            }
+            RecordingLevel[] idToName = new RecordingLevel[maxRL + 1];
+            for (RecordingLevel level : RecordingLevel.values()) {
+                idToName[level.id] = level;
+            }
+            ID_TO_TYPE = idToName;
+            MAX_RECORDING_LEVEL_KEY = maxRL;
+        }
+
+        /** an english description of the api--this is for debugging and can change */
+        public final String name;
+
+        /** the permanent and immutable id of an API--this can't change ever */
+        public final short id;
+
+        RecordingLevel(int id, String name) {
+            this.id = (short) id;
+            this.name = name;
+        }
+
+        public static RecordingLevel forId(int id) {
+            if (id < MIN_RECORDING_LEVEL_KEY || id > MAX_RECORDING_LEVEL_KEY)
+                throw new IllegalArgumentException(String.format("Unexpected RecordLevel id `%s`, it should be between `%s` " +
+                    "and `%s` (inclusive)", id, MIN_RECORDING_LEVEL_KEY, MAX_RECORDING_LEVEL_KEY));
+            return ID_TO_TYPE[id];
+        }
+
+        /** Case insensitive lookup by protocol name */
+        public static RecordingLevel forName(String name) {
+            return RecordingLevel.valueOf(name.toUpperCase(Locale.ROOT));
+        }
+
+        public boolean shouldRecord(final int configId) {
+            if (configId == DEBUG.id) {
+                return true;
+            } else {
+                return configId == this.id;
+            }
+        }
+
+    }
+
+    private final RecordingLevel recordingLevel;
+
+    Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time,
+           long inactiveSensorExpirationTimeSeconds, RecordingLevel recordingLevel) {
         super();
         this.registry = registry;
         this.name = Utils.notNull(name);
@@ -52,6 +109,7 @@ public final class Sensor {
         this.time = time;
         this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, TimeUnit.SECONDS);
         this.lastRecordTime = time.milliseconds();
+        this.recordingLevel = recordingLevel;
         checkForest(new HashSet<Sensor>());
     }
 
@@ -74,17 +132,27 @@ public final class Sensor {
      * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
      */
     public void record() {
-        record(1.0);
+        if (shouldRecord()) {
+            record(1.0);
+        }
     }
 
     /**
+     * @return true if the sensor's record level indicates that the metric will be recorded, false otherwise
+     */
+    public boolean shouldRecord() {
+        return this.recordingLevel.shouldRecord(config.recordLevel().id);
+    }
+    /**
      * Record a value with this sensor
      * @param value The value to record
      * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
      *         bound
      */
     public void record(double value) {
-        record(value, time.milliseconds());
+        if (shouldRecord()) {
+            record(value, time.milliseconds());
+        }
     }
 
     /**
@@ -96,15 +164,17 @@ public final class Sensor {
      *         bound
      */
     public void record(double value, long timeMs) {
-        this.lastRecordTime = timeMs;
-        synchronized (this) {
-            // increment all the stats
-            for (int i = 0; i < this.stats.size(); i++)
-                this.stats.get(i).record(config, value, timeMs);
-            checkQuotas(timeMs);
+        if (shouldRecord()) {
+            this.lastRecordTime = timeMs;
+            synchronized (this) {
+                // increment all the stats
+                for (int i = 0; i < this.stats.size(); i++)
+                    this.stats.get(i).record(config, value, timeMs);
+                checkQuotas(timeMs);
+            }
+            for (int i = 0; i < parents.length; i++)
+                parents[i].record(value, timeMs);
         }
-        for (int i = 0; i < parents.length; i++)
-            parents[i].record(value, timeMs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
new file mode 100644
index 0000000..9aadc82
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.junit.Test;
+
+public class SensorTest {
+    @Test
+    public void testRecordLevelEnum() {
+        Sensor.RecordingLevel configLevel = Sensor.RecordingLevel.INFO;
+        assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
+        assertFalse(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
+
+        configLevel = Sensor.RecordingLevel.DEBUG;
+        assertTrue(Sensor.RecordingLevel.INFO.shouldRecord(configLevel.id));
+        assertTrue(Sensor.RecordingLevel.DEBUG.shouldRecord(configLevel.id));
+
+        assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.DEBUG.toString()),
+            Sensor.RecordingLevel.DEBUG);
+        assertEquals(Sensor.RecordingLevel.valueOf(Sensor.RecordingLevel.INFO.toString()),
+            Sensor.RecordingLevel.INFO);
+    }
+
+    @Test
+    public void testShouldRecord() {
+        MetricConfig debugConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG);
+        MetricConfig infoConfig = new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO);
+
+        Sensor infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
+            0, Sensor.RecordingLevel.INFO);
+        assertTrue(infoSensor.shouldRecord());
+        infoSensor = new Sensor(null, "infoSensor", null, debugConfig, new SystemTime(),
+            0, Sensor.RecordingLevel.DEBUG);
+        assertTrue(infoSensor.shouldRecord());
+
+        Sensor debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
+            0, Sensor.RecordingLevel.INFO);
+        assertTrue(debugSensor.shouldRecord());
+        debugSensor = new Sensor(null, "debugSensor", null, infoConfig, new SystemTime(),
+            0, Sensor.RecordingLevel.DEBUG);
+        assertFalse(debugSensor.shouldRecord());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index cf9daa2..73fee6c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util.Properties
+
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
@@ -28,7 +29,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
-import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.{MetricsReporter, Sensor}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 
@@ -163,6 +164,7 @@ object Defaults {
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
+  val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
 
   /** ********* SSL configuration ***********/
   val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
@@ -326,6 +328,7 @@ object KafkaConfig {
   val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
   val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
+  val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
 
   /** ********* SSL Configuration ****************/
   val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
@@ -547,6 +550,7 @@ object KafkaConfig {
   val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
   val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
+  val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
 
   /** ********* SSL Configuration ****************/
   val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC
@@ -716,6 +720,7 @@ object KafkaConfig {
       .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
       .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
       .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
+      .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
 
       /** ********* Quota configuration ***********/
       .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 93d0413..d5e2ce3 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -532,6 +532,7 @@ class KafkaConfigTest {
         case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricReporterClassesProp => // ignore string
+        case KafkaConfig.MetricRecordingLevelProp => // ignore string
         case KafkaConfig.RackProp => // ignore string
         //SSL Configs
         case KafkaConfig.PrincipalBuilderClassProp =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index cf49896..fc47672 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
@@ -42,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -205,6 +209,14 @@ public class KafkaStreams {
         return state;
     }
 
+    /**
+     * Get read-only handle on global metrics registry
+     * @return Map of all metrics.
+     */
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
     private class StreamStateListener implements StreamThread.StateListener {
         @Override
         public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) {
@@ -272,6 +284,7 @@ public class KafkaStreams {
         reporters.add(new JmxReporter(JMX_PREFIX));
 
         final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                 TimeUnit.MILLISECONDS);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6b20e98..956ae0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -39,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Configuration for Kafka Streams. Documentation for these configurations can be found in the <a
@@ -120,6 +122,9 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>metrics.num.samples</code> */
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
+    /** <code>metrics.record.level</code> */
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
     /** <code>metric.reporters</code> */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
@@ -262,6 +267,12 @@ public class StreamsConfig extends AbstractConfig {
                         atLeast(1),
                         Importance.LOW,
                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(METRICS_RECORDING_LEVEL_CONFIG,
+                        Type.STRING,
+                        Sensor.RecordingLevel.INFO.toString(),
+                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                        Importance.LOW,
+                        CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                 .define(APPLICATION_SERVER_CONFIG,
                         Type.STRING,
                         "",

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index c0870c6..afd3497 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -17,26 +17,94 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.metrics.Sensor;
 
+import java.util.Map;
+
 /**
  * The Kafka Streams metrics interface for adding metric sensors and collecting metric values.
  */
+@InterfaceStability.Unstable
 public interface StreamsMetrics {
 
     /**
-     * Add the latency sensor.
+     * Get read-only handle on global metrics registry
+     * @return Map of all metrics.
+     */
+    Map<MetricName, ? extends Metric> metrics();
+
+    /**
+     * Add a latency sensor and default associated metrics. Metrics include both latency ones
+     * (average and max latency) and throughput ones (operations/time unit).
      *
      * @param scopeName Name of the scope, could be the type of the state store, etc.
      * @param entityName Name of the entity, could be the name of the state store instance, etc.
+     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor.
      * @param operationName Name of the operation, could be get / put / delete / etc.
      * @param tags Additional tags of the sensor.
      * @return The added sensor.
      */
-    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+    Sensor addLatencySensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags);
 
     /**
      * Record the given latency value of the sensor.
+     * @param sensor sensor whose latency we are recording.
+     * @param startNs start of measurement time in nanoseconds.
+     * @param endNs end of measurement time in nanoseconds.
      */
     void recordLatency(Sensor sensor, long startNs, long endNs);
+
+    /**
+     * Add a throughput sensor and default associated metrics. Metrics include throughput ones
+     * (operations/time unit).
+     *
+     * @param scopeName Name of the scope, could be the type of the state store, etc.
+     * @param entityName Name of the entity, could be the name of the state store instance, etc.
+     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor.
+     * @param operationName Name of the operation, could be get / put / delete / etc.
+     * @param tags Additional tags of the sensor.
+     * @return The added sensor.
+     */
+    Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags);
+
+    /**
+     * Records the throughput value of a sensor.
+     * @param sensor addSensor whose throughput we are recording.
+     * @param value throughput value.
+     */
+    void recordThroughput(Sensor sensor, long value);
+
+
+    /**
+     * Generic method to create a sensor.
+     * Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}
+     * or {@link #addLatencySensor(String, String, String, Sensor.RecordingLevel, String...)} to ensure
+     * metric name well-formedness and conformity with the rest of the streams code base. However,
+     * if the above two methods are not sufficient, this method can also be used.
+     * @param name Name of the sensor.
+     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor.
+     */
+    Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel);
+
+    /**
+     * Generic method to create a sensor with parent sensors.
+     * Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}
+     * or {@link #addLatencySensor(String, String, String, Sensor.RecordingLevel, String...)} to ensure
+     * metric name well-formedness and conformity with the rest of the streams code base. However,
+     * if the above two methods are not sufficient, this method can also be used.
+     * @param name Name of the sensor.
+     * @param recordingLevel The recording level (e.g., INFO or DEBUG) for this sensor.
+     */
+    Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents);
+
+    /**
+     * Remove a sensor.
+     * @param sensor Sensor to be removed.
+     */
+    void removeSensor(Sensor sensor);
 }
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index f7f8e8d..f165ebf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,12 +17,18 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class ProcessorNode<K, V> {
@@ -31,6 +37,42 @@ public class ProcessorNode<K, V> {
 
     private final String name;
     private final Processor<K, V> processor;
+    protected NodeMetrics nodeMetrics;
+    private Time time;
+
+    private K key;
+    private V value;
+    private Runnable processDelegate = new Runnable() {
+        @Override
+        public void run() {
+            processor.process(key, value);
+        }
+    };
+    private ProcessorContext context;
+    private Runnable initDelegate = new Runnable() {
+        @Override
+        public void run() {
+            if (processor != null) {
+                processor.init(context);
+            }
+        }
+    };
+    private Runnable closeDelegate = new Runnable() {
+        @Override
+        public void run() {
+            if (processor != null) {
+                processor.close();
+            }
+        }
+    };
+
+    private long timestamp;
+    private Runnable punctuateDelegate = new Runnable() {
+        @Override
+        public void run() {
+            processor().punctuate(timestamp);
+        }
+    };
 
     public final Set<String> stateStores;
 
@@ -38,11 +80,13 @@ public class ProcessorNode<K, V> {
         this(name, null, null);
     }
 
+
     public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
         this.name = name;
         this.processor = processor;
         this.children = new ArrayList<>();
         this.stateStores = stateStores;
+        this.time = new SystemTime();
     }
 
 
@@ -62,9 +106,12 @@ public class ProcessorNode<K, V> {
         children.add(child);
     }
 
+
     public void init(ProcessorContext context) {
+        this.context = context;
         try {
-            processor.init(context);
+            nodeMetrics = new NodeMetrics(context.metrics(), name,  "task." + context.taskId());
+            nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
         } catch (Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
         }
@@ -72,14 +119,27 @@ public class ProcessorNode<K, V> {
 
     public void close() {
         try {
-            processor.close();
+            nodeMetrics.metrics.measureLatencyNs(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
+            nodeMetrics.removeAllSensors();
         } catch (Exception e) {
             throw new StreamsException(String.format("failed to close processor %s", name), e);
         }
     }
 
+
     public void process(final K key, final V value) {
-        processor.process(key, value);
+        this.key = key;
+        this.value = value;
+
+        this.nodeMetrics.metrics.measureLatencyNs(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
+
+        // record throughput
+        this.nodeMetrics.metrics.recordThroughput(nodeMetrics.nodeThroughputSensor, 1);
+    }
+
+    public void punctuate(long timestamp) {
+        this.timestamp = timestamp;
+        this.nodeMetrics.metrics.measureLatencyNs(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
     }
 
     /**
@@ -106,4 +166,43 @@ public class ProcessorNode<K, V> {
         }
         return sb.toString();
     }
+
+    protected class NodeMetrics  {
+        final StreamsMetricsImpl metrics;
+        final String metricGrpName;
+        final Map<String, String> metricTags;
+
+        final Sensor nodeProcessTimeSensor;
+        final Sensor nodePunctuateTimeSensor;
+        final Sensor nodeThroughputSensor;
+        final Sensor nodeCreationSensor;
+        final Sensor nodeDestructionSensor;
+
+
+        public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) {
+            final String scope = "processor-node";
+            final String tagKey = "processor-node-id";
+            final String tagValue = name;
+            this.metrics = (StreamsMetricsImpl) metrics;
+            this.metricGrpName = "stream-processor-node-metrics";
+            this.metricTags = new LinkedHashMap<>();
+            this.metricTags.put(tagKey, tagValue);
+
+            // these are all latency metrics
+            this.nodeProcessTimeSensor = metrics.addLatencySensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodePunctuateTimeSensor = metrics.addLatencySensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeCreationSensor = metrics.addLatencySensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeDestructionSensor = metrics.addLatencySensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeThroughputSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "process-throughput", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+
+        }
+
+        public void removeAllSensors() {
+            metrics.removeSensor(nodeProcessTimeSensor);
+            metrics.removeSensor(nodePunctuateTimeSensor);
+            metrics.removeSensor(nodeThroughputSensor);
+            metrics.removeSensor(nodeCreationSensor);
+            metrics.removeSensor(nodeDestructionSensor);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 65bad39..0ebfda7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -53,6 +53,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     @SuppressWarnings("unchecked")
     @Override
     public void init(ProcessorContext context) {
+        super.init(context);
         this.context = context;
 
         // if serializers are null, get the default ones from the context
@@ -63,7 +64,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         if (this.valSerializer instanceof ChangedSerializer &&
                 ((ChangedSerializer) this.valSerializer).inner() == null)
             ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer());
-
     }
 
 
@@ -91,11 +91,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         }
     }
 
-    @Override
-    public void close() {
-        // do nothing
-    }
-
     /**
      * @return a string representation of this node, useful for debugging.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index ad9c213..771f504 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -46,6 +46,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     @SuppressWarnings("unchecked")
     @Override
     public void init(ProcessorContext context) {
+        super.init(context);
         this.context = context;
 
         // if deserializers are null, get the default ones from the context
@@ -64,11 +65,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     @Override
     public void process(final K key, final V value) {
         context.forward(key, value);
-    }
-
-    @Override
-    public void close() {
-        // do nothing
+        nodeMetrics.nodeThroughputSensor.record();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index b9367ab..86bdc19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -69,7 +69,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
     private final StreamsConfig config;
     private final Serde<?> keySerde;
     private final Serde<?> valSerde;
-    private final ThreadCache zeroSizedCache = new ThreadCache(0);
+    private final ThreadCache zeroSizedCache;
 
     private boolean initialized;
 
@@ -86,7 +86,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
         this.config = config;
         this.keySerde = config.keySerde();
         this.valSerde = config.valueSerde();
-
+        this.zeroSizedCache = new ThreadCache("zeroCache", 0, this.metrics);
         this.initialized = false;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 9369c01..07ae761 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -59,8 +61,24 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
-
     private boolean requiresPoll = true;
+    private final Time time;
+    private final TaskMetrics metrics;
+    private Runnable commitDelegate = new Runnable() {
+        @Override
+        public void run() {
+            log.debug("{} Committing its state", logPrefix);
+            // 1) flush local state
+            stateMgr.flush(processorContext);
+
+            log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
+            // 2) flush produced records in the downstream and change logs of local states
+            recordCollector.flush();
+
+            // 3) commit consumed offsets if it is dirty already
+            commitOffsets();
+        }
+    };
 
     /**
      * Create {@link StreamTask} with its assigned partitions
@@ -85,10 +103,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       StreamsMetrics metrics,
                       StateDirectory stateDirectory,
                       ThreadCache cache,
+                      Time time,
                       final RecordCollector recordCollector) {
         super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+        this.metrics = new TaskMetrics(metrics);
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
@@ -113,7 +133,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
-
+        this.time = time;
         // initialize the state stores
         log.info("{} Initializing state stores", logPrefix);
         initializeStateStores();
@@ -242,7 +262,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(), timestamp);
 
         try {
-            node.processor().punctuate(timestamp);
+            node.punctuate(timestamp);
         } catch (KafkaException ke) {
             throw new StreamsException(String.format("Exception caught in punctuate. taskId=%s processor=%s", id,  node.name()), ke);
         } finally {
@@ -250,21 +270,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
-
     /**
      * Commit the current task state
      */
     public void commit() {
-        log.debug("{} Committing its state", logPrefix);
-        // 1) flush local state
-        stateMgr.flush(processorContext);
-
-        log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
-        // 2) flush produced records in the downstream and change logs of local states
-        recordCollector.flush();
-
-        // 3) commit consumed offsets if it is dirty already
-        commitOffsets();
+        metrics.metrics.measureLatencyNs(time, commitDelegate, metrics.taskCommitTimeSensor);
     }
 
     /**
@@ -321,7 +331,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         for (ProcessorNode node : this.topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
-                node.init(this.processorContext);
+                node.init(processorContext);
             } finally {
                 processorContext.setCurrentNode(null);
             }
@@ -361,6 +371,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
         this.partitionGroup.close();
         closeTopology();
+        metrics.removeAllSensors();
     }
 
     @Override
@@ -385,9 +396,26 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return super.toString();
     }
 
+    protected class TaskMetrics  {
+        final StreamsMetricsImpl metrics;
+        final Sensor taskCommitTimeSensor;
+
+
+        public TaskMetrics(StreamsMetrics metrics) {
+            String name = id.toString();
+            this.metrics = (StreamsMetricsImpl) metrics;
+            this.taskCommitTimeSensor = metrics.addLatencySensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
+        }
+
+        public void removeAllSensors() {
+            metrics.removeSensor(taskCommitTimeSensor);
+        }
+    }
+
     @Override
     public void flushState() {
         super.flushState();
         recordCollector.flush();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 65e857f..90194c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -24,20 +24,16 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
@@ -46,7 +42,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCacheMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +107,7 @@ public class StreamThread extends Thread {
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
-        State(final Integer...validTransitions) {
+        State(final Integer... validTransitions) {
             this.validTransitions.addAll(Arrays.asList(validTransitions));
         }
 
@@ -124,6 +119,7 @@ public class StreamThread extends Thread {
             return validTransitions.contains(newState.ordinal());
         }
     }
+
     private volatile State state = State.NOT_RUNNING;
     private StateListener stateListener = null;
 
@@ -142,10 +138,8 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Set the {@link StateListener} to be notified when state changes.
-     * Note this API is internal to Kafka Streams and is not intended to be used by an
-     * external application.
-     * @param listener
+     * Set the {@link StateListener} to be notified when state changes. Note this API is internal to
+     * Kafka Streams and is not intended to be used by an external application.
      */
     public void setStateListener(final StateListener listener) {
         this.stateListener = listener;
@@ -203,7 +197,7 @@ public class StreamThread extends Thread {
     private final long pollTimeMs;
     private final long cleanTimeMs;
     private final long commitTimeMs;
-    private final StreamsMetricsImpl sensors;
+    private final StreamsMetricsThreadImpl streamsMetrics;
     final StateDirectory stateDirectory;
 
     private StreamPartitionAssignor partitionAssignor = null;
@@ -276,6 +270,10 @@ public class StreamThread extends Thread {
         return state == State.RUNNING;
     }
 
+    public String threadClientId() {
+        return threadClientId;
+    }
+
     public StreamThread(TopologyBuilder builder,
                         StreamsConfig config,
                         KafkaClientSupplier clientSupplier,
@@ -297,13 +295,14 @@ public class StreamThread extends Thread {
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
         this.streamsMetadataState = streamsMetadataState;
         threadClientId = clientId + "-" + threadName;
-        this.sensors = new StreamsMetricsImpl(metrics);
+        this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId,
+            Collections.singletonMap("client-id", threadClientId));
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
             log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
         }
         long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
             config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
-        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.sensors);
+        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
 
 
         this.logPrefix = String.format("stream-thread [%s]", threadName);
@@ -348,8 +347,9 @@ public class StreamThread extends Thread {
 
     /**
      * Execute the stream processors
+     *
      * @throws KafkaException for any Kafka-related exceptions
-     * @throws Exception for any other non-Kafka exceptions
+     * @throws Exception      for any other non-Kafka exceptions
      */
     @Override
     public void run() {
@@ -415,6 +415,7 @@ public class StreamThread extends Thread {
 
         log.info("{} Stream thread shutdown complete", logPrefix);
         setState(State.NOT_RUNNING);
+        streamsMetrics.removeAllSensors();
     }
 
     private RuntimeException unAssignChangeLogPartitions() {
@@ -452,8 +453,13 @@ public class StreamThread extends Thread {
 
 
     /**
+<<<<<<< HEAD
+     * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
+     * soon the tasks will be assigned again
+=======
      * Similar to shutdownTasksAndState, however does not close the task managers,
      * in the hope that soon the tasks will be assigned again
+>>>>>>> apache-kafka/trunk
      */
     private void suspendTasksAndState()  {
         log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix,
@@ -536,8 +542,8 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Compute the latency based on the current marked timestamp,
-     * and update the marked timestamp with the current system timestamp.
+     * Compute the latency based on the current marked timestamp, and update the marked timestamp
+     * with the current system timestamp.
      *
      * @return latency
      */
@@ -579,7 +585,7 @@ public class StreamThread extends Thread {
                         StreamTask task = activeTasksByPartition.get(partition);
                         numAddedRecords += task.addRecords(partition, records.records(partition));
                     }
-                    sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
+                    streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
                     polledRecords = true;
                 } else {
                     polledRecords = false;
@@ -587,7 +593,7 @@ public class StreamThread extends Thread {
 
                 // only record poll latency is long poll is required
                 if (longPoll) {
-                    sensors.pollTimeSensor.record(computeLatency());
+                    streamsMetrics.pollTimeSensor.record(computeLatency());
                 }
             }
 
@@ -603,7 +609,7 @@ public class StreamThread extends Thread {
 
                         requiresPoll = requiresPoll || task.requiresPoll();
 
-                        sensors.processTimeSensor.record(computeLatency());
+                        streamsMetrics.processTimeSensor.record(computeLatency());
 
                         maybePunctuate(task);
 
@@ -680,7 +686,7 @@ public class StreamThread extends Thread {
             // check whether we should punctuate based on the task's partition group timestamp;
             // which are essentially based on record timestamp.
             if (task.maybePunctuate())
-                sensors.punctuateTimeSensor.record(computeLatency());
+                streamsMetrics.punctuateTimeSensor.record(computeLatency());
 
         } catch (KafkaException e) {
             log.error("{} Failed to punctuate active task {}: ", logPrefix, task.id(), e);
@@ -745,7 +751,7 @@ public class StreamThread extends Thread {
             throw e;
         }
 
-        sensors.commitTimeSensor.record(computeLatency());
+        streamsMetrics.commitTimeSensor.record(computeLatency());
     }
 
     /**
@@ -788,11 +794,11 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions);
 
-        sensors.taskCreationSensor.record();
+        streamsMetrics.taskCreationSensor.record();
 
         final ProcessorTopology topology = builder.build(id.topicGroupId);
         final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
-        return new StreamTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory, cache, recordCollector);
+        return new StreamTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory, cache, time, recordCollector);
     }
 
     private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
@@ -901,12 +907,12 @@ public class StreamThread extends Thread {
     StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions);
 
-        sensors.taskCreationSensor.record();
+        streamsMetrics.taskCreationSensor.record();
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStores().isEmpty()) {
-            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors, stateDirectory);
+            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory);
         } else {
             return null;
         }
@@ -1005,7 +1011,7 @@ public class StreamThread extends Thread {
             public void apply(final AbstractTask task) {
                 log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
                 task.close();
-                sensors.taskDestructionSensor.record();
+                streamsMetrics.taskDestructionSensor.record();
             }
         }, "close");
     }
@@ -1016,7 +1022,7 @@ public class StreamThread extends Thread {
             public void apply(final AbstractTask task) {
                 log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id());
                 task.closeTopology();
-                sensors.taskDestructionSensor.record();
+                streamsMetrics.taskDestructionSensor.record();
             }
         }, "close");
     }
@@ -1063,12 +1069,11 @@ public class StreamThread extends Thread {
         return sb.toString();
     }
 
-    private class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics {
-        final Metrics metrics;
-        final String metricGrpName;
-        final String sensorNamePrefix;
-        final Map<String, String> metricTags;
-
+    /**
+     * This class extends {@link #StreamsMetricsImpl(Metrics, String, String, Map)} and
+     * overrides one of its functions for efficiency
+     */
+    private class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
         final Sensor commitTimeSensor;
         final Sensor pollTimeSensor;
         final Sensor processTimeSensor;
@@ -1077,117 +1082,54 @@ public class StreamThread extends Thread {
         final Sensor taskDestructionSensor;
         final Sensor skippedRecordsSensor;
 
-        public StreamsMetricsImpl(Metrics metrics) {
-            this.metrics = metrics;
-            this.metricGrpName = "stream-metrics";
-            this.sensorNamePrefix = "thread." + threadClientId;
-            this.metricTags = Collections.singletonMap("client-id", threadClientId);
+        public StreamsMetricsThreadImpl(Metrics metrics, String groupName, String prefix, Map<String, String> tags) {
+            super(metrics, groupName, tags);
+            this.commitTimeSensor = metrics.sensor(prefix + ".commit-time", Sensor.RecordingLevel.INFO);
+            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
+            this.commitTimeSensor.add(metrics.metricName("commit-time-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
+            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(new Count()));
 
-            this.commitTimeSensor = metrics.sensor(sensorNamePrefix + ".commit-time");
-            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
-            this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
-            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
+            this.pollTimeSensor = metrics.sensor(prefix + ".poll-time", Sensor.RecordingLevel.INFO);
+            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
+            this.pollTimeSensor.add(metrics.metricName("poll-time-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
+            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(new Count()));
 
-            this.pollTimeSensor = metrics.sensor(sensorNamePrefix + ".poll-time");
-            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
-            this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
-            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
+            this.processTimeSensor = metrics.sensor(prefix + ".process-time", Sensor.RecordingLevel.INFO);
+            this.processTimeSensor.add(metrics.metricName("process-time-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
+            this.processTimeSensor.add(metrics.metricName("process-time-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
+            this.processTimeSensor.add(metrics.metricName("process-calls-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(new Count()));
 
-            this.processTimeSensor = metrics.sensor(sensorNamePrefix + ".process-time");
-            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
-            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
-            this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
+            this.punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-time", Sensor.RecordingLevel.INFO);
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(new Count()));
 
-            this.punctuateTimeSensor = metrics.sensor(sensorNamePrefix + ".punctuate-time");
-            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
-            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
-            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
+            this.taskCreationSensor = metrics.sensor(prefix + ".task-creation", Sensor.RecordingLevel.INFO);
+            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), new Rate(new Count()));
 
-            this.taskCreationSensor = metrics.sensor(sensorNamePrefix + ".task-creation");
-            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
+            this.taskDestructionSensor = metrics.sensor(prefix + ".task-destruction", Sensor.RecordingLevel.INFO);
+            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", this.groupName, "The average per-second number of destructed tasks", this.tags), new Rate(new Count()));
 
-            this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
-            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+            this.skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
+            this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Count()));
 
-            this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records");
-            this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count()));
         }
 
+
         @Override
         public void recordLatency(Sensor sensor, long startNs, long endNs) {
             sensor.record(endNs - startNs, timerStartedMs);
         }
 
-        @Override
-        public void recordCacheSensor(Sensor sensor, double count) {
-            sensor.record(count);
-        }
-
-        /**
-         * @throws IllegalArgumentException if tags is not constructed in key-value pairs
-         */
-        @Override
-        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
-            // extract the additional tags if there are any
-            Map<String, String> tagMap = new HashMap<>(this.metricTags);
-            if ((tags.length % 2) != 0)
-                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
-
-            for (int i = 0; i < tags.length; i += 2)
-                tagMap.put(tags[i], tags[i + 1]);
-
-            String metricGroupName = "stream-" + scopeName + "-metrics";
-
-            // first add the global operation metrics if not yet, with the global tags only
-            Sensor parent = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + operationName);
-            addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
-
-            // add the store operation metrics with additional tags
-            Sensor sensor = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + entityName + "-" + operationName, parent);
-            addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
-
-            return sensor;
-        }
-
-        @Override
-        public Sensor addCacheSensor(String entityName, String operationName, String... tags) {
-            // extract the additional tags if there are any
-            Map<String, String> tagMap = new HashMap<>(this.metricTags);
-            if ((tags.length % 2) != 0)
-                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
-
-            for (int i = 0; i < tags.length; i += 2)
-                tagMap.put(tags[i], tags[i + 1]);
-
-            String metricGroupName = "stream-thread-cache-metrics";
-
-            Sensor sensor = metrics.sensor(sensorNamePrefix + "-" + entityName + "-" + operationName);
-            addCacheMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
-            return sensor;
-
-        }
-
-        private void addCacheMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-avg", metricGrpName,
-                "The current count of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-min", metricGrpName,
-                "The current count of " + entityName + " " + opName + " operation.", tags), new Min());
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-max", metricGrpName,
-                "The current count of " + entityName + " " + opName + " operation.", tags), new Max());
-        }
-
-        private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-avg-latency-ms", metricGrpName,
-                "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-max-latency-ms", metricGrpName,
-                "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
-            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-qps", metricGrpName,
-                "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
-        }
+        public void removeAllSensors() {
+            removeSensor(commitTimeSensor);
+            removeSensor(pollTimeSensor);
+            removeSensor(processTimeSensor);
+            removeSensor(punctuateTimeSensor);
+            removeSensor(taskCreationSensor);
+            removeSensor(taskDestructionSensor);
+            removeSensor(skippedRecordsSensor);
 
-        private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
-            if (!metrics.metrics().containsKey(name))
-                sensor.add(name, stat);
         }
     }
 


Mime
View raw message