kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka git commit: KAFKA-3715: Add granular metrics to Kafka Streams and add hierarhical logging levels to Metrics
Date Wed, 11 Jan 2017 20:06:47 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
new file mode 100644
index 0000000..e29476b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsMetricsImpl implements StreamsMetrics {
+    private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
+
+    final Metrics metrics;
+    final String groupName;
+    final Map<String, String> tags;
+    final Map<Sensor, Sensor> parentSensors;
+
+    public StreamsMetricsImpl(Metrics metrics, String groupName,  Map<String, String> tags) {
+        Objects.requireNonNull(metrics, "Metrics cannot be null");
+
+        this.metrics = metrics;
+        this.groupName = groupName;
+        this.tags = tags;
+        this.parentSensors = new HashMap<>();
+    }
+
+    public Metrics registry() {
+        return metrics;
+    }
+
+    @Override
+    public Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel) {
+        return metrics.sensor(name, recordingLevel);
+    }
+
+    @Override
+    public Sensor addSensor(String name, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
+        return metrics.sensor(name, recordingLevel, parents);
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
+    @Override
+    public void recordLatency(Sensor sensor, long startNs, long endNs) {
+        sensor.record(endNs - startNs);
+    }
+
+    @Override
+    public void recordThroughput(Sensor sensor, long value) {
+        sensor.record(value);
+    }
+
+
+    private String groupNameFromScope(String scopeName) {
+        return "stream-" + scopeName + "-metrics";
+    }
+
+    private String sensorName(String operationName, String entityName) {
+        if (entityName == null) {
+            return operationName;
+        } else {
+            return entityName + "-" + operationName;
+        }
+    }
+
+    private Map<String, String> tagMap(String... tags) {
+        // extract the additional tags if there are any
+        Map<String, String> tagMap = new HashMap<>(this.tags);
+        if ((tags.length % 2) != 0)
+            throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
+
+        for (int i = 0; i < tags.length; i += 2)
+            tagMap.put(tags[i], tags[i + 1]);
+
+        return tagMap;
+    }
+
+
+
+    /**
+     * @throws IllegalArgumentException if tags is not constructed in key-value pairs
+     */
+    @Override
+    public Sensor addLatencySensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
+        Map<String, String> tagMap = tagMap(tags);
+
+        // first add the global operation metrics if not yet, with the global tags only
+        Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+        addLatencyMetrics(scopeName, parent, "all", operationName, tagMap);
+
+        // add the operation metrics with additional tags
+        Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+        addLatencyMetrics(scopeName, sensor, entityName, operationName, tagMap);
+
+        parentSensors.put(sensor, parent);
+
+        return sensor;
+    }
+
+    /**
+     * @throws IllegalArgumentException if tags is not constructed in key-value pairs
+     */
+    @Override
+    public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
+        Map<String, String> tagMap = tagMap(tags);
+
+        // first add the global operation metrics if not yet, with the global tags only
+        Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
+        addThroughputMetrics(scopeName, parent, "all", operationName, tagMap);
+
+        // add the operation metrics with additional tags
+        Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
+        addThroughputMetrics(scopeName, sensor, entityName, operationName, tagMap);
+
+        parentSensors.put(sensor, parent);
+
+        return sensor;
+    }
+
+    private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
+        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-avg-latency", groupNameFromScope(scopeName),
+            "The average latency of " + entityName + " " + opName + " operation.", tags), new Avg());
+        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-max-latency", groupNameFromScope(scopeName),
+            "The max latency of " + entityName + " " + opName + " operation.", tags), new Max());
+        addThroughputMetrics(scopeName, sensor, entityName, opName, tags);
+    }
+
+    private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
+        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-qps", groupNameFromScope(scopeName),
+            "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
+    }
+
+    private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
+        if (!metrics.metrics().containsKey(name)) {
+            sensor.add(name, stat);
+        } else {
+            log.debug("Trying to add metric twice " + name);
+        }
+    }
+
+    /**
+     * Helper function. Measure the latency of an action. This is equivalent to
+     * startTs = time.nanoseconds()
+     * action.run()
+     * endTs = time.nanoseconds()
+     * sensor.record(endTs - startTs)
+     * @param time      Time object.
+     * @param action    Action to run.
+     * @param sensor    Sensor to record value.
+     */
+    public void measureLatencyNs(final Time time, final Runnable action, final Sensor sensor) {
+        long startNs = -1;
+        if (sensor.shouldRecord()) {
+            startNs = time.nanoseconds();
+        }
+        action.run();
+        if (startNs != -1) {
+            recordLatency(sensor, startNs, time.nanoseconds());
+        }
+    }
+
+    /**
+     * Deletes a sensor and its parents, if any
+     */
+    @Override
+    public void removeSensor(Sensor sensor) {
+        Sensor parent = null;
+        Objects.requireNonNull(sensor, "Sensor is null");
+
+        metrics.removeSensor(sensor.name());
+        parent = parentSensors.get(sensor);
+        if (parent != null) {
+            metrics.removeSensor(parent.name());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index fb8b3b1..926e5d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -20,9 +20,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -50,7 +50,56 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor rangeTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamsMetrics metrics;
+    private StreamsMetricsImpl metrics;
+
+
+    private K key;
+    private V value;
+    private Runnable getDelegate = new Runnable() {
+        @Override
+        public void run() {
+            value = inner.get(key);
+        }
+    };
+    private Runnable putDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.put(key, value);
+        }
+    };
+    private Runnable putIfAbsentDelegate = new Runnable() {
+        @Override
+        public void run() {
+            value = inner.putIfAbsent(key, value);
+        }
+    };
+    private List<KeyValue<K, V>> entries;
+    private Runnable putAllDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.putAll(entries);
+        }
+    };
+    private Runnable deleteDelegate = new Runnable() {
+        @Override
+        public void run() {
+            value = inner.delete(key);
+        }
+    };
+    private Runnable flushDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.flush();
+        }
+    };
+    private ProcessorContext context;
+    private StateStore root;
+    private Runnable initDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.init(context, root);
+        }
+    };
 
     // always wrap the store with the metered store
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) {
@@ -67,24 +116,21 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     @Override
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.putIfAbsentTime = this.metrics.addLatencySensor(metricScope, name, "put-if-absent");
-        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
-        this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
-        this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
-        this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
-        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
-        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+        this.context = context;
+        this.root = root;
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
+        this.putIfAbsentTime = this.metrics.addLatencySensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG);
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
+        this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete", Sensor.RecordingLevel.DEBUG);
+        this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all", Sensor.RecordingLevel.DEBUG);
+        this.allTime = this.metrics.addLatencySensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG);
+        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG);
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
 
         // register and possibly restore the state from the logs
-        long startNs = time.nanoseconds();
-        try {
-            inner.init(context, root);
-        } finally {
-            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
-        }
+        metrics.measureLatencyNs(time, initDelegate, this.restoreTime);
     }
 
     @Override
@@ -99,52 +145,37 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public V get(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.get(key);
-        } finally {
-            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        metrics.measureLatencyNs(time, getDelegate, this.getTime);
+        return value;
     }
 
     @Override
     public void put(K key, V value) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.put(key, value);
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        this.value = value;
+        metrics.measureLatencyNs(time, putDelegate, this.putTime);
     }
 
     @Override
     public V putIfAbsent(K key, V value) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.putIfAbsent(key, value);
-        } finally {
-            this.metrics.recordLatency(this.putIfAbsentTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        this.value = value;
+        metrics.measureLatencyNs(time, putIfAbsentDelegate, this.putIfAbsentTime);
+        return this.value;
     }
 
     @Override
     public void putAll(List<KeyValue<K, V>> entries) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.putAll(entries);
-        } finally {
-            this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
-        }
+        this.entries = entries;
+        metrics.measureLatencyNs(time, putAllDelegate, this.putAllTime);
     }
 
     @Override
     public V delete(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.delete(key);
-        } finally {
-            this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        metrics.measureLatencyNs(time, deleteDelegate, this.deleteTime);
+        return value;
     }
 
     @Override
@@ -169,12 +200,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void flush() {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.flush();
-        } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
-        }
+        metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
     }
 
     private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
index 5181c08..e0ed03e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -55,13 +55,13 @@ class MeteredSegmentedBytesStore implements SegmentedBytesStore {
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
-        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
-        this.removeTime = this.metrics.addLatencySensor(metricScope, name, "remove");
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
+        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG);
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
+        this.removeTime = this.metrics.addLatencySensor(metricScope, name, "remove", Sensor.RecordingLevel.DEBUG);
 
-        final Sensor restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+        final Sensor restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 6533460..c725c1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -36,7 +36,38 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     private Sensor fetchTime;
     private Sensor flushTime;
     private Sensor restoreTime;
-    private StreamsMetrics metrics;
+    private StreamsMetricsImpl metrics;
+
+    private ProcessorContext context;
+    private StateStore root;
+    private Runnable initDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.init(context, root);
+        }
+    };
+
+    private K key;
+    private V value;
+    private long timestamp;
+    private Runnable putDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.put(key, value);
+        }
+    };
+    private Runnable putTsDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.put(key, value, timestamp);
+        }
+    };
+    private Runnable flushDelegate = new Runnable() {
+        @Override
+        public void run() {
+            inner.flush();
+        }
+    };
 
     // always wrap the store with the metered store
     public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
@@ -53,19 +84,16 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     @Override
     public void init(ProcessorContext context, StateStore root) {
         final String name = name();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
-        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+        this.context = context;
+        this.root = root;
+        this.metrics = (StreamsMetricsImpl) context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
+        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG);
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
 
         // register and possibly restore the state from the logs
-        long startNs = time.nanoseconds();
-        try {
-            inner.init(context, root);
-        } finally {
-            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
-        }
+        metrics.measureLatencyNs(time, initDelegate, this.restoreTime);
     }
 
     @Override
@@ -85,22 +113,17 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.put(key, value);
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        this.value = value;
+        metrics.measureLatencyNs(time, putDelegate, this.putTime);
     }
 
     @Override
     public void put(K key, V value, long timestamp) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.put(key, value, timestamp);
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+        metrics.measureLatencyNs(time, putTsDelegate, this.putTime);
     }
 
     @Override
@@ -110,12 +133,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void flush() {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.flush();
-        } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
-        }
+        metrics.measureLatencyNs(time, flushDelegate, this.flushTime);
     }
 
     private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 70f8676..849caa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -17,15 +17,22 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -39,8 +46,7 @@ class NamedCache {
     private LRUNode tail;
     private LRUNode head;
     private long currentSizeBytes;
-    private ThreadCacheMetrics metrics;
-
+    private NamedCacheMetrics namedCacheMetrics;
     // JMX stats
     private Sensor hitRatio = null;
 
@@ -51,15 +57,13 @@ class NamedCache {
     private long numOverwrites = 0;
     private long numFlushes = 0;
 
-    NamedCache(final String name) {
-        this(name, null);
-    }
-
-    NamedCache(final String name, final ThreadCacheMetrics metrics) {
+    NamedCache(final String name, final StreamsMetrics metrics) {
         this.name = name;
-        this.metrics = metrics != null ? metrics : new ThreadCache.NullThreadCacheMetrics();
+        this.namedCacheMetrics = new NamedCacheMetrics(metrics);
+    }
 
-        this.hitRatio = this.metrics.addCacheSensor(name, "hitRatio");
+    synchronized final String name() {
+        return name;
     }
 
     synchronized long hits() {
@@ -181,7 +185,7 @@ class NamedCache {
             return null;
         } else {
             numReadHits++;
-            metrics.recordCacheSensor(hitRatio, (double) numReadHits / (double) (numReadHits + numReadMisses));
+            namedCacheMetrics.hitRatioSensor.record((double) numReadHits / (double) (numReadHits + numReadMisses));
         }
         return node;
     }
@@ -308,6 +312,7 @@ class NamedCache {
         currentSizeBytes = 0;
         dirtyKeys.clear();
         cache.clear();
+        namedCacheMetrics.removeAllSensors();
     }
 
     /**
@@ -353,4 +358,38 @@ class NamedCache {
         }
     }
 
+    class NamedCacheMetrics  {
+        final StreamsMetricsImpl metrics;
+        final String groupName;
+        final Map<String, String> metricTags;
+        final Sensor hitRatioSensor;
+
+
+        public NamedCacheMetrics(StreamsMetrics metrics) {
+            final String scope = "record-cache";
+            final String entityName = name;
+            final String opName = "hitRatio";
+            final String tagKey = "record-cache-id";
+            final String tagValue = name;
+            this.groupName = "stream-" + scope + "-metrics";
+            this.metrics = (StreamsMetricsImpl) metrics;
+            this.metricTags = new LinkedHashMap<>();
+            this.metricTags.put(tagKey, tagValue);
+
+
+            hitRatioSensor = this.metrics.registry().sensor(entityName + "-" + opName, Sensor.RecordingLevel.DEBUG);
+
+            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-avg", groupName,
+                "The current count of " + entityName + " " + opName + " operation.", metricTags), new Avg());
+            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-min", groupName,
+                "The current count of " + entityName + " " + opName + " operation.", metricTags), new Min());
+            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-max", groupName,
+                "The current count of " + entityName + " " + opName + " operation.", metricTags), new Max());
+
+        }
+
+        public void removeAllSensors() {
+            metrics.removeSensor(hitRatioSensor);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index a66c961..6f33a63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ public class ThreadCache {
     private final String name;
     private final long maxCacheSizeBytes;
     private final Map<String, NamedCache> caches = new HashMap<>();
-    private final ThreadCacheMetrics metrics;
+    private final StreamsMetrics metrics;
 
     // internal stats
     private long numPuts = 0;
@@ -55,14 +55,10 @@ public class ThreadCache {
         void apply(final List<DirtyEntry> dirty);
     }
 
-    public ThreadCache(long maxCacheSizeBytes) {
-        this(null, maxCacheSizeBytes, null);
-    }
-
-    public ThreadCache(final String name, long maxCacheSizeBytes, final ThreadCacheMetrics metrics) {
+    public ThreadCache(final String name, long maxCacheSizeBytes, final StreamsMetrics metrics) {
         this.name = name;
         this.maxCacheSizeBytes = maxCacheSizeBytes;
-        this.metrics = metrics != null ? metrics : new NullThreadCacheMetrics();
+        this.metrics = metrics;
     }
 
     public long puts() {
@@ -152,7 +148,7 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator range(final String namespace, final byte[] from, final byte[] to) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace));
+            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
         }
         return new MemoryLRUCacheBytesIterator(cache.keyRange(cacheKey(from), cacheKey(to)), cache);
     }
@@ -160,7 +156,7 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator all(final String namespace) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace));
+            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
         }
         return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
     }
@@ -327,17 +323,4 @@ public class ThreadCache {
         }
     }
 
-    public static class NullThreadCacheMetrics implements ThreadCacheMetrics {
-        @Override
-        public Sensor addCacheSensor(String entityName, String operationName, String... tags) {
-            return null;
-        }
-
-        @Override
-        public void recordCacheSensor(Sensor sensor, double value) {
-            // do nothing
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
deleted file mode 100644
index cad2697..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCacheMetrics.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-/**
- * The Kafka Streams metrics interface for adding metric sensors and collecting metric values.
- */
-public interface ThreadCacheMetrics {
-
-    /**
-     * Add the hit ratio sensor.
-     * @param entityName Name of the entity, could be the name of the cache instance, etc.
-     * @param operationName Name of the operation, could be "hit ratio".
-     * @param tags Additional tags of the sensor.
-     * @return The added sensor.
-     */
-    Sensor addCacheSensor(String entityName, String operationName, String... tags);
-
-    /**
-     * Record the given value of the sensor.
-     */
-    void recordCacheSensor(Sensor sensor, double value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 5804407..50efe5b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -38,6 +42,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -142,6 +147,41 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void testNumberDefaultMetrics() {
+        final KafkaStreams streams = createKafkaStreams();
+        final Map<MetricName, ? extends Metric> metrics = streams.metrics();
+        // all 15 default StreamThread metrics + 1 metric that keeps track of number of metrics
+        assertEquals(metrics.size(), 16);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testIllegalMetricsConfig() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+    }
+
+    @Test
+    public void testLegalMetricsConfig() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
+        final KStreamBuilder builder1 = new KStreamBuilder();
+        final KafkaStreams streams1 = new KafkaStreams(builder1, props);
+        streams1.close();
+
+        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
+        final KStreamBuilder builder2 = new KStreamBuilder();
+        final KafkaStreams streams2 = new KafkaStreams(builder2, props);
+
+    }
+
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
         final KafkaStreams streams = createKafkaStreams();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index ba95522..0ecaf3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
@@ -89,7 +91,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new MockProcessorContext(new KStreamTestDriver(new KStreamBuilder(), stateDir), stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(100000)) {
+        context = new MockProcessorContext(new KStreamTestDriver(new KStreamBuilder(), stateDir), stateDir,
+            Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public <K, V> void forward(final K key, final V value) {
                 results.add(KeyValue.pair(key, value));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 264be3b..7ba6161 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -129,6 +129,8 @@ public class SimpleBenchmark {
         benchmark.processStreamWithSink(SOURCE_TOPIC);
         // simple stream performance source->store
         benchmark.processStreamWithStateStore(SOURCE_TOPIC);
+        // simple stream performance source->cache->store
+        benchmark.processStreamWithCachedStateStore(SOURCE_TOPIC);
         // simple streams performance KSTREAM-KTABLE join
         benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "kStreamKTable", JOIN_TOPIC_2_PREFIX + "kStreamKTable");
         // simple streams performance KSTREAM-KSTREAM join
@@ -302,11 +304,8 @@ public class SimpleBenchmark {
         }
     }
 
-    public void processStreamWithStateStore(String topic) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, zookeeper, latch);
-
+    private void internalProcessStreamWithStore(final KafkaStreams streams, final CountDownLatch latch,
+                                                final String message) {
         Thread thread = new Thread() {
             public void run() {
                 streams.start();
@@ -326,7 +325,7 @@ public class SimpleBenchmark {
 
         long endTime = System.currentTimeMillis();
 
-        System.out.println("Streams Performance [MB/sec read+store]: " + megaBytePerSec(endTime - startTime));
+        System.out.println(message + megaBytePerSec(endTime - startTime));
 
         streams.close();
         try {
@@ -335,6 +334,21 @@ public class SimpleBenchmark {
             // ignore
         }
     }
+    public void processStreamWithStateStore(String topic) {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, zookeeper, latch, false);
+        internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]: ");
+
+    }
+
+    public void processStreamWithCachedStateStore(String topic) {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, zookeeper, latch, true);
+
+        internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]: ");
+    }
 
     /**
      * Produce values to a topic
@@ -552,9 +566,10 @@ public class SimpleBenchmark {
     }
 
     private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka, String zookeeper,
-                                                          final CountDownLatch latch) {
+                                                          final CountDownLatch latch,
+                                                          boolean enableCaching) {
         Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store" + enableCaching);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
@@ -562,8 +577,11 @@ public class SimpleBenchmark {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
-
+        if (enableCaching) {
+            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
+        } else {
+            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
+        }
         KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
 
         source.process(new ProcessorSupplier<Integer, byte[]>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index b1c19ba..cf161f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -72,7 +73,7 @@ public class AbstractTaskTest {
                                 consumer,
                                 false,
                                 new StateDirectory("app", TestUtils.tempDirectory().getPath()),
-                                new ThreadCache(0)) {
+                                new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public void commit() {
                 // do nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
new file mode 100644
index 0000000..f6cbbf2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Collections;
+import org.apache.kafka.common.metrics.Metrics;
+
+public class MockStreamsMetrics extends StreamsMetricsImpl {
+
+    public MockStreamsMetrics(Metrics metrics) {
+        super(metrics, "mock-stream-metrics",
+            Collections.<String, String>emptyMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 3aafc00..b1d5f08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -17,12 +17,19 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
 
 public class ProcessorNodeTest {
 
@@ -62,4 +69,62 @@ public class ProcessorNodeTest {
         }
     }
 
+    private static class NoOpProcessor implements Processor {
+        @Override
+        public void init(final ProcessorContext context) {
+
+        }
+
+        @Override
+        public void process(final Object key, final Object value) {
+
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    @Test
+    public void testMetrics() {
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null));
+        final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
+        node.init(context);
+
+        Metrics metrics = context.baseMetrics();
+        String name = "task." + context.taskId() + "." + node.name();
+        String[] entities = {"all", name};
+        String[] latencyOperations = {"process", "punctuate", "create", "destroy"};
+        String throughputOperation =  "process-throughput";
+        String groupName = "stream-processor-node-metrics";
+        Map<String, String> tags = Collections.singletonMap("processor-node-id", node.name());
+
+        for (String operation : latencyOperations) {
+            assertNotNull(metrics.getSensor(operation));
+            assertNotNull(metrics.getSensor(name + "-" + operation));
+        }
+        assertNotNull(metrics.getSensor(throughputOperation));
+
+        for (String entity : entities) {
+            for (String operation : latencyOperations) {
+                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-avg-latency", groupName,
+                    "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
+                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-max-latency", groupName,
+                    "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
+                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-qps", groupName,
+                    "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags)));
+            }
+            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + throughputOperation + "-qps", groupName,
+                "The average number of occurrence of " + entity + " " + throughputOperation + " operation per second.", tags)));
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 10a86fe..c5fecfa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,13 +24,12 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -321,17 +320,8 @@ public class StandbyTaskTest {
         builder.stream("topic").groupByKey().count("my-store");
         final ProcessorTopology topology = builder.build(0);
         StreamsConfig config = createConfig(baseDir);
-        new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() {
-            @Override
-            public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
-                return null;
-            }
-
-            @Override
-            public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
-
-            }
-        }, stateDirectory);
+        new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config,
+            new MockStreamsMetrics(new Metrics()), stateDirectory);
 
     }
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3d50007..adabde7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -23,14 +23,17 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -58,6 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -131,7 +135,8 @@ public class StreamTaskTest {
     public void testProcessOrder() throws Exception {
         StreamsConfig config = createConfig(baseDir);
         recordCollector = new RecordCollectorImpl(producer, "taskId");
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer,
+            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -171,14 +176,41 @@ public class StreamTaskTest {
 
         task.close();
 
+    }
 
+    @Test
+    public void testMetrics() throws Exception {
+        StreamsConfig config = createConfig(baseDir);
+        recordCollector = new RecordCollectorImpl(producer, "taskId");
+        Metrics metrics = new Metrics();
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer,
+            restoreStateConsumer, config, new MockStreamsMetrics(metrics), stateDirectory, null, new MockTime(), recordCollector);
+        String name = task.id().toString();
+        String[] entities = {"all", name};
+        String operation = "commit";
+
+        String groupName = "stream-task-metrics";
+        Map<String, String> tags = Collections.singletonMap("streams-task-id", name);
+
+        assertNotNull(metrics.getSensor(operation));
+        assertNotNull(metrics.getSensor(name + "-" + operation));
+
+        for (String entity : entities) {
+            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-avg-latency", groupName,
+                "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
+            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-max-latency", groupName,
+                "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
+            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-qps", groupName,
+                "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags)));
+        }
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testPauseResume() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
+        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer,
+            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -237,7 +269,8 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuate() throws Exception {
         StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, null, recordCollector);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer,
+            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), recordCollector);
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -310,13 +343,17 @@ public class StreamTaskTest {
         final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
         final Map<String, SourceNode> sourceNodes
                 = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ProcessorTopology topology = new ProcessorTopology(processorNodes,
                                                                  sourceNodes,
                                                                  Collections.<String, SinkNode>emptyMap(),
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore, ProcessorNode>emptyMap());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector);
+
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
+            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+
         final int offset = 20;
         streamTask.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -364,7 +401,10 @@ public class StreamTaskTest {
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore, ProcessorNode>emptyMap());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector);
+
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
+            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
 
         try {
             streamTask.punctuate(punctuator, 1);
@@ -391,7 +431,9 @@ public class StreamTaskTest {
                 flushed.set(true);
             }
         };
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "appId", partitions, topology, consumer, restoreStateConsumer, createConfig(baseDir), null, stateDirectory, new ThreadCache(0), recordCollector);
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "appId", partitions, topology, consumer,
+            restoreStateConsumer, createConfig(baseDir), streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
         streamTask.flushState();
         assertTrue(flushed.get());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d787ed5..454b4c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,19 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -27,7 +40,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -42,39 +54,34 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Before;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.junit.Assert.assertThat;
+
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
     private final String applicationId = "stream-thread-test";
-    private final UUID processId = UUID.randomUUID();
+    private UUID processId = UUID.randomUUID();
+
+    @Before
+    public void setUp() throws Exception {
+        processId = UUID.randomUUID();
+    }
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@@ -84,22 +91,22 @@ public class StreamThreadTest {
     private TopicPartition t3p2 = new TopicPartition("topic3", 2);
 
     private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
+        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
     private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
             Collections.<String>emptySet());
 
     private final PartitionAssignor.Subscription subscription =
-            new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());
+        new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());
 
     private ByteBuffer subscriptionUserData() {
         UUID uuid = UUID.randomUUID();
@@ -149,18 +156,10 @@ public class StreamThreadTest {
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config,
+                              StreamsMetrics metrics,
                               StateDirectory stateDirectory) {
-            super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, new StreamsMetrics() {
-                @Override
-                public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
-                    return null;
-                }
-
-                @Override
-                public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
-
-                }
-            }, stateDirectory, null, new RecordCollectorImpl(producer, id.toString()));
+            super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, metrics,
+                stateDirectory, null, new MockTime(), new RecordCollectorImpl(producer, id.toString()));
         }
 
         @Override
@@ -212,8 +211,10 @@ public class StreamThreadTest {
 
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
+
                 ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+                return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
+                    producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
             }
         };
 
@@ -424,6 +425,44 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void testMetrics() throws Exception {
+        TopologyBuilder builder = new TopologyBuilder().setApplicationId("MetricsApp");
+        StreamsConfig config = new StreamsConfig(configProps());
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        Metrics metrics = new Metrics();
+        StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+            clientId,  processId, metrics, new MockTime(), new StreamsMetadataState(builder));
+        String defaultGroupName = "stream-metrics";
+        String defaultPrefix = "thread." + thread.threadClientId();
+        Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
+
+        assertNotNull(metrics.getSensor(defaultPrefix + ".commit-time"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".poll-time"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".process-time"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-time"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".task-creation"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".task-destruction"));
+        assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));
+
+        assertNotNull(metrics.metrics().get(metrics.metricName("commit-time-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("commit-time-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("commit-calls-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("poll-time-avg", defaultGroupName, "The average poll time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("poll-time-max", defaultGroupName, "The maximum poll time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("poll-calls-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("process-time-avg", defaultGroupName, "The average process time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("process-time-max", defaultGroupName, "The maximum process time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("process-calls-rate", defaultGroupName, "The average per-second number of process calls", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-time-avg", defaultGroupName, "The average punctuate time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-time-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-calls-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("task-creation-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("task-destruction-rate", defaultGroupName, "The average per-second number of destructed tasks", defaultTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-count", defaultGroupName, "The average per-second number of skipped records.", defaultTags)));
+    }
+
+    @Test
     public void testMaybeClean() throws Exception {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
@@ -460,7 +499,8 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
+                        producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -587,7 +627,8 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
+                        producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -810,7 +851,8 @@ public class StreamThreadTest {
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
-                final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer, producer, restoreConsumer, config, stateDirectory);
+                final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer,
+                    producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 createdTasks.put(partitions, task);
                 return task;
             }
@@ -864,6 +906,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.producer,
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
+                                                                 new MockStreamsMetrics(new Metrics()),
                                                                  new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
             @Override
             public void close() {
@@ -914,6 +957,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.producer,
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
+                                                                 new MockStreamsMetrics(new Metrics()),
                                                                  new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
             @Override
             public void flushState() {
@@ -964,6 +1008,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.producer,
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
+                                                                 new MockStreamsMetrics(new Metrics()),
                                                                  new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
             @Override
             public void closeTopology() {
@@ -1013,6 +1058,7 @@ public class StreamThreadTest {
                                                                  clientSupplier.producer,
                                                                  clientSupplier.restoreConsumer,
                                                                  config,
+                                                                 new MockStreamsMetrics(new Metrics()),
                                                                  new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG))) {
             @Override
             public void flushState() {
@@ -1058,7 +1104,7 @@ public class StreamThreadTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Assignment> assignments =
-                partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
+            partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
 
         partitionAssignor.onAssignment(assignments.get("client"));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
new file mode 100644
index 0000000..b7c0c2c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamsMetricsImplTest {
+
+    @Test(expected = NullPointerException.class)
+    public void testNullMetrics() throws Exception {
+        String groupName = "doesNotMatter";
+        Map<String, String> tags = new HashMap<>();
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(null, groupName, tags);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testRemoveNullSensor() {
+        String groupName = "doesNotMatter";
+        Map<String, String> tags = new HashMap<>();
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+        streamsMetrics.removeSensor(null);
+    }
+
+    @Test
+    public void testRemoveSensor() {
+        String groupName = "doesNotMatter";
+        String sensorName = "sensor1";
+        String scope = "scope";
+        String entity = "entity";
+        String operation = "put";
+        Map<String, String> tags = new HashMap<>();
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+
+        Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
+        streamsMetrics.removeSensor(sensor1);
+
+        Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1);
+        streamsMetrics.removeSensor(sensor1a);
+
+        Sensor sensor2 = streamsMetrics.addLatencySensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        streamsMetrics.removeSensor(sensor2);
+
+        Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        streamsMetrics.removeSensor(sensor3);
+    }
+
+    @Test
+    public void testLatencyMetrics() {
+        String groupName = "doesNotMatter";
+        String scope = "scope";
+        String entity = "entity";
+        String operation = "put";
+        Map<String, String> tags = new HashMap<>();
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+
+        Sensor sensor1 = streamsMetrics.addLatencySensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+
+        Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
+        // 6 metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
+        assertEquals(metrics.size(), 7);
+
+        streamsMetrics.removeSensor(sensor1);
+        metrics = streamsMetrics.metrics();
+        assertEquals(metrics.size(), 1);
+    }
+
+    @Test
+    public void testThroughputMetrics() {
+        String groupName = "doesNotMatter";
+        String scope = "scope";
+        String entity = "entity";
+        String operation = "put";
+        Map<String, String> tags = new HashMap<>();
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+
+        Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+
+        Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
+        // 2 metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
+        assertEquals(metrics.size(), 3);
+
+        streamsMetrics.removeSensor(sensor1);
+        metrics = streamsMetrics.metrics();
+        assertEquals(metrics.size(), 1);
+    }
+}


Mime
View raw message