kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Fixes to metric names of Streams
Date Thu, 03 Aug 2017 21:16:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fb21209b5 -> 6bee1e9e5


HOTFIX: Fixes to metric names of Streams

A couple of fixes to metric names to match the KIP
- Removed extra strings in the metric names that are already in the tags
- add a separate metric for "all"

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3491 from enothereska/hotfix-metric-names


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6bee1e9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6bee1e9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6bee1e9e

Branch: refs/heads/trunk
Commit: 6bee1e9e5799398550e1897bcef6c5067452500b
Parents: fb21209
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Aug 3 14:16:47 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Aug 3 14:16:47 2017 -0700

----------------------------------------------------------------------
 docs/ops.html                                   | 321 +++++++++++++++++--
 .../processor/internals/ProcessorNode.java      |  31 +-
 .../streams/processor/internals/StreamTask.java |   3 +-
 .../processor/internals/StreamsMetricsImpl.java |  60 ++--
 .../state/internals/CachingKeyValueStore.java   |   2 +-
 .../state/internals/MeteredKeyValueStore.java   |  32 +-
 .../internals/MeteredSegmentedBytesStore.java   |  23 +-
 .../streams/state/internals/NamedCache.java     |  41 ++-
 .../streams/state/internals/ThreadCache.java    |  32 ++
 .../processor/internals/ProcessorNodeTest.java  |  45 ++-
 .../processor/internals/StreamTaskTest.java     |  29 +-
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   3 +-
 ...rtedCacheWrappedWindowStoreIteratorTest.java |   2 +-
 .../streams/state/internals/NamedCacheTest.java |  32 +-
 .../state/internals/ThreadCacheTest.java        | 103 +++---
 15 files changed, 552 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index 6c5f316..b34aba4 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1239,7 +1239,19 @@
 
   <h4><a id="kafka_streams_monitoring" href="#kafka_streams_monitoring">Streams Monitoring</a></h4>
 
-  A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to streams. By default Kafka Streams has metrics with two recording levels: debug and info. The debug level records all metrics, while the info level records only the thread-level metrics.  Use the following configuration option to specify which metrics you want collected:
+  A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to streams.
+  By default Kafka Streams has metrics with two recording levels: debug and info. The debug level records all metrics, while
+  the info level records only the thread-level metrics.
+
+  <p>
+    Note that the metrics have a 3-layer hierarchy. At the top level there are per-thread metrics. Each thread has tasks, with their
+    own metrics. Each task has a number of processor nodes, with their own metrics. Each task also has a number of state stores
+    and record caches, all with their own metrics.
+  </p>
+  
+  Use the following configuration option to specify which metrics
+  you want collected:
+
 <pre>metrics.recording.level="info"</pre>
 
 <h5><a id="kafka_streams_thread_monitoring" href="#kafka_streams_thread_monitoring">Thread Metrics</a></h5>
@@ -1251,30 +1263,80 @@ All the following metrics have a recording level of ``info``:
         <th>Description</th>
         <th>Mbean name</th>
       </tr>
-      <tr>
-        <td>[commit | poll | process | punctuate]-latency-[avg | max]</td>
-        <td>The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread.</td>
-        <td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>[commit | poll | process | punctuate]-rate</td>
-        <td>The average number of respective operations per second across all tasks.</td>
-        <td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
+       <tr>
+        <td>commit-latency-avg</td>
+        <td>The average execution time in ms for committing, across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+        <tr>
+        <td>commit-latency-max</td>
+        <td>The maximum execution time in ms for committing across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>poll-latency-avg</td>
+        <td>The average execution time in ms for polling, across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+        <tr>
+        <td>poll-latency-max</td>
+        <td>The maximum execution time in ms for polling across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>process-latency-avg</td>
+        <td>The average execution time in ms for processing, across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>process-latency-max</td>
+        <td>The maximum execution time in ms for processing across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>punctuate-latency-avg</td>
+        <td>The average execution time in ms for punctuating, across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>punctuate-latency-max</td>
+        <td>The maximum execution time in ms for punctuating across all running tasks of this thread.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>commit-rate</td>
+        <td>The average number of commits per second across all tasks.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>poll-rate</td>
+        <td>The average number of polls per second across all tasks.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>process-rate</td>
+        <td>The average number of process calls per second across all tasks.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
+       </tr>
+       <tr>
+        <td>punctuate-rate</td>
+        <td>The average number of punctuates per second across all tasks.</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
       </tr>
       <tr>
         <td>task-created-rate</td>
         <td>The average number of newly created tasks per second.</td>
-        <td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
       </tr>
       <tr>
         <td>task-closed-rate</td>
         <td>The average number of tasks closed per second.</td>
-        <td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
       </tr>
       <tr>
         <td>skipped-records-rate</td>
         <td>The average number of skipped records per second. </td>
-        <td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
+        <td>kafka.streams:type=stream-metrics,client-id=([-.\w]+)</td>
       </tr>
  </tbody>
 </table>
@@ -1289,14 +1351,19 @@ All the following metrics have a recording level of ``debug``:
         <th>Mbean name</th>
       </tr>
       <tr>
-        <td>commit-latency-[avg | max]</td>
-        <td>The [average | maximum] commit time in ns for this task. </td>
-        <td>kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+)</td>
+        <td>commit-latency-avg</td>
+        <td>The average commit time in ns for this task. </td>
+        <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>commit-latency-max</td>
+        <td>The maximum commit time in ns for this task. </td>
+        <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
       </tr>
       <tr>
         <td>commit-rate</td>
         <td>The average number of commit calls per second. </td>
-        <td>kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+)</td>
+        <td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
       </tr>
  </tbody>
 </table>
@@ -1311,19 +1378,69 @@ All the following metrics have a recording level of ``debug``:
         <th>Mbean name</th>
       </tr>
       <tr>
-        <td>[process | punctuate | create | destroy]-latency-[avg | max]</td>
-        <td>The [average | maximum] execution time in ns, for the respective operation. </td>
-        <td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
+        <td>process-latency-avg</td>
+        <td>The average process execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>process-latency-max</td>
+        <td>The maximum process execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>punctuate-latency-avg</td>
+        <td>The average punctuate execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>punctuate-latency-max</td>
+        <td>The maximum punctuate execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
       </tr>
       <tr>
-        <td>[process | punctuate | create | destroy]-rate</td>
-        <td>The average number of respective operations per second. </td>
-        <td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
+        <td>create-latency-avg</td>
+        <td>The average create execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>create-latency-max</td>
+        <td>The maximum create execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>destroy-latency-avg</td>
+        <td>The average destroy execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>destroy-latency-max</td>
+        <td>The maximum destroy execution time in ns. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>process-rate</td>
+        <td>The average number of process operations per second. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>punctuate-rate</td>
+        <td>The average number of punctuate operations per second. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>create-rate</td>
+        <td>The average number of create operations per second. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>destroy-rate</td>
+        <td>The average number of destroy operations per second. </td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
       </tr>
       <tr>
         <td>forward-rate</td>
         <td>The average rate of records being forwarded downstream, from source nodes only, per second. </td>
-        <td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
+        <td>kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)</td>
       </tr>
  </tbody>
  </table>
@@ -1339,16 +1456,140 @@ All the following metrics have a recording level of ``debug``:
         <th>Mbean name</th>
       </tr>
       <tr>
-        <td>[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]</td>
-        <td>The average execution time in ns, for the respective operation. </td>
-        <td>kafka.streams:type=stream-[store-type]-metrics</td>
+        <td>put-latency-avg</td>
+        <td>The average put execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
       </tr>
-        <tr>
-        <td>[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate</td>
-        <td>The average rate of respective operations per second for this store.</td>
-        <td>kafka.streams:type=stream-[store-type]-metrics</td>
+      <tr>
+        <td>put-latency-max</td>
+        <td>The maximum put execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-if-absent-latency-avg</td>
+        <td>The average put-if-absent execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-if-absent-latency-max</td>
+        <td>The maximum put-if-absent execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>get-latency-avg</td>
+        <td>The average get execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>get-latency-max</td>
+        <td>The maximum get execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>delete-latency-avg</td>
+        <td>The average delete execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>delete-latency-max</td>
+        <td>The maximum delete execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-all-latency-avg</td>
+        <td>The average put-all execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-all-latency-max</td>
+        <td>The maximum put-all execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>all-latency-avg</td>
+        <td>The average all operation execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>all-latency-max</td>
+        <td>The maximum all operation execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>range-latency-avg</td>
+        <td>The average range execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>range-latency-max</td>
+        <td>The maximum range execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+       <tr>
+        <td>flush-latency-avg</td>
+        <td>The average flush execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>flush-latency-max</td>
+        <td>The maximum flush execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>restore-latency-avg</td>
+        <td>The average restore execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>restore-latency-max</td>
+        <td>The maximum restore execution time in ns. </td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-rate</td>
+        <td>The average put rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-if-absent-rate</td>
+        <td>The average put-if-absent rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>get-rate</td>
+        <td>The average get rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>delete-rate</td>
+        <td>The average delete rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>put-all-rate</td>
+        <td>The average put-all rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>all-rate</td>
+        <td>The average all operation rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>range-rate</td>
+        <td>The average range rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>flush-rate</td>
+        <td>The average flush rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>restore-rate</td>
+        <td>The average restore rate for this store.</td>
+        <td>kafka.streams:type=stream-[store-type]-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-type]-state-id=([-.\w]+)</td>
       </tr>
-      
     </tbody>
  </table>
 
@@ -1363,9 +1604,19 @@ All the following metrics have a recording level of ``debug``:
         <th>Mbean name</th>
       </tr>
       <tr>
-        <td>hitRatio-[avg | min | max]</td>
-        <td>The cache hit ratio defined as the ratio of cache read hits over the total cache read requests. </td>
-        <td>kafka.streams:type=stream-record-cache-metrics, record-cache-id=([-.\w]+)</td>
+        <td>hitRatio-avg</td>
+        <td>The average cache hit ratio defined as the ratio of cache read hits over the total cache read requests. </td>
+        <td>kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>hitRatio-min</td>
+        <td>The mininum cache hit ratio. </td>
+        <td>kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>hitRatio-max</td>
+        <td>The maximum cache hit ratio. </td>
+        <td>kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)</td>
       </tr>
     </tbody>
  </table>

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 0cc746e..29f442f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -26,9 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class ProcessorNode<K, V> {
@@ -102,7 +100,7 @@ public class ProcessorNode<K, V> {
     public void init(ProcessorContext context) {
         this.context = context;
         try {
-            nodeMetrics = new NodeMetrics(context.metrics(), name, "task." + context.taskId());
+            nodeMetrics = new NodeMetrics(context.metrics(), name, context);
             nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
         } catch (Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
@@ -163,7 +161,6 @@ public class ProcessorNode<K, V> {
 
     protected static final class NodeMetrics  {
         final StreamsMetricsImpl metrics;
-        final Map<String, String> metricTags;
 
         final Sensor nodeProcessTimeSensor;
         final Sensor nodePunctuateTimeSensor;
@@ -173,21 +170,25 @@ public class ProcessorNode<K, V> {
         final Sensor nodeDestructionSensor;
 
 
-        public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) {
+        public NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) {
             final String scope = "processor-node";
-            final String tagKey = "processor-node-id";
-            final String tagValue = name;
+            final String tagKey = "task-id";
+            final String tagValue = context.taskId().toString();
             this.metrics = (StreamsMetricsImpl) metrics;
-            this.metricTags = new LinkedHashMap<>();
-            this.metricTags.put(tagKey, tagValue);
 
             // these are all latency metrics
-            this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-            this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-            this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-            this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-            this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
-            this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "process",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "punctuate",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, name, "create",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, name, "destroy",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, name, "forward",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, name, "skippedDueToDeserializationError",
+                    Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
         }
 
         public void removeAllSensors() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e78a46e..d4bd668 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -80,7 +80,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         TaskMetrics(final StreamsMetrics metrics) {
             final String name = id().toString();
             this.metrics = (StreamsMetricsImpl) metrics;
-            taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
+            taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit",
+                    Sensor.RecordingLevel.DEBUG);
         }
 
         void removeAllSensors() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index ec35c86..ea85b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -34,6 +34,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 public class StreamsMetricsImpl implements StreamsMetrics {
     private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
@@ -94,34 +97,43 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
-    private Map<String, String> tagMap(String... tags) {
+    public Map<String, String> tagMap(String... tags) {
         // extract the additional tags if there are any
         Map<String, String> tagMap = new HashMap<>(this.tags);
-        if ((tags.length % 2) != 0)
-            throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
-
-        for (int i = 0; i < tags.length; i += 2)
-            tagMap.put(tags[i], tags[i + 1]);
+        if (tags != null) {
+            if ((tags.length % 2) != 0)
+                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
 
+            for (int i = 0; i < tags.length; i += 2)
+                tagMap.put(tags[i], tags[i + 1]);
+        }
         return tagMap;
     }
 
 
+    private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) {
+        List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
+        updatedTagList.add(scopeName + "-id");
+        updatedTagList.add(entityName);
+        return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
+    }
 
     /**
      * @throws IllegalArgumentException if tags is not constructed in key-value pairs
      */
     @Override
-    public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
-        Map<String, String> tagMap = tagMap(tags);
+    public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName,
+                                                Sensor.RecordingLevel recordingLevel, String... tags) {
+        final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
+        final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
         Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
-        addLatencyMetrics(scopeName, parent, "all", operationName, tagMap);
+        addLatencyMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
         Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
-        addLatencyMetrics(scopeName, sensor, entityName, operationName, tagMap);
+        addLatencyMetrics(scopeName, sensor, operationName, tagMap);
 
         parentSensors.put(sensor, parent);
 
@@ -133,35 +145,37 @@ public class StreamsMetricsImpl implements StreamsMetrics {
      */
     @Override
     public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
-        Map<String, String> tagMap = tagMap(tags);
+        final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
+        final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);
 
         // first add the global operation metrics if not yet, with the global tags only
         Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
-        addThroughputMetrics(scopeName, parent, "all", operationName, tagMap);
+        addThroughputMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
         Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
-        addThroughputMetrics(scopeName, sensor, entityName, operationName, tagMap);
+        addThroughputMetrics(scopeName, sensor, operationName, tagMap);
 
         parentSensors.put(sensor, parent);
 
         return sensor;
     }
 
-    private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-avg", groupNameFromScope(scopeName),
-            "The average latency of " + entityName + " " + opName + " operation.", tags), new Avg());
-        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-max", groupNameFromScope(scopeName),
-            "The max latency of " + entityName + " " + opName + " operation.", tags), new Max());
-        addThroughputMetrics(scopeName, sensor, entityName, opName, tags);
+    private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
+
+        maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
+            "The average latency of " + opName + " operation.", tags), new Avg());
+        maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName),
+            "The max latency of " + opName + " operation.", tags), new Max());
+        addThroughputMetrics(scopeName, sensor, opName, tags);
     }
 
-    private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-        maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName),
-            "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
+    private void addThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
+        maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
+            "The average number of occurrence of " + opName + " operation per second.", tags), new Rate(new Count()));
     }
 
-    private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
+    public void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
         if (!metrics.metrics().containsKey(name)) {
             sensor.add(name, stat);
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index b7d41b4..e147ea8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -69,8 +69,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
                                         keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                                         valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
-        this.cacheName = context.taskId() + "-" + underlying.name();
         this.cache = this.context.getCache();
+        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name());
         cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> entries) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index fd80b1a..95cbedb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -63,19 +63,31 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
 
     @Override
     public void init(ProcessorContext context, StateStore root) {
-        final String name = name();
+        final String tagKey = "task-id";
+        final String tagValue = context.taskId().toString();
+
         this.context = context;
         this.root = root;
+
         this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
-        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
-        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "delete", Sensor.RecordingLevel.DEBUG);
-        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-all", Sensor.RecordingLevel.DEBUG);
-        this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG);
-        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-if-absent",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "delete",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-all",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "all",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "range",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
 
         // register and possibly restore the state from the logs
         if (restoreTime.shouldRecord()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
index 664873a..4058364 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java
@@ -50,15 +50,22 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public void init(ProcessorContext context, StateStore root) {
-        final String name = name();
+        final String tagKey = "task-id";
+        final String tagValue = context.taskId().toString();
         this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
-        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG);
-        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
-        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
-        this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "remove", Sensor.RecordingLevel.DEBUG);
-
-        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
+        this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+        this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "remove",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+
+        final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
+                Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
         // register and possibly restore the state from the logs
         final long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index c86d9df..47d5152 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -358,28 +357,38 @@ class NamedCache {
         final StreamsMetricsImpl metrics;
         final String groupName;
         final Map<String, String> metricTags;
+        final Map<String, String> allMetricTags;
         final Sensor hitRatioSensor;
 
-
         public NamedCacheMetrics(StreamsMetrics metrics) {
             final String scope = "record-cache";
-            final String entityName = name;
             final String opName = "hitRatio";
-            final String tagKey = "record-cache-id";
-            final String tagValue = name;
+            final String tagKey = scope + "-id";
+            final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name);
             this.groupName = "stream-" + scope + "-metrics";
             this.metrics = (StreamsMetricsImpl) metrics;
-            this.metricTags = new LinkedHashMap<>();
-            this.metricTags.put(tagKey, tagValue);
-
-            hitRatioSensor = this.metrics.registry().sensor(entityName + "-" + opName, Sensor.RecordingLevel.DEBUG);
-
-            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-avg", groupName,
-                "The average cache hit ratio of " + entityName, metricTags), new Avg());
-            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-min", groupName,
-                "The minimum cache hit ratio of " + entityName, metricTags), new Min());
-            hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-max", groupName,
-                "The maximum cache hit ratio of " + entityName, metricTags), new Max());
+            this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, "all",
+                    "task-id", ThreadCache.taskIDfromCacheName(name));
+            this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, tagValue,
+                    "task-id", ThreadCache.taskIDfromCacheName(name));
+
+            // add parent
+            Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName,
+                    "The average cache hit ratio.", allMetricTags), new Avg());
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName,
+                    "The minimum cache hit ratio.", allMetricTags), new Min());
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName,
+                    "The maximum cache hit ratio.", allMetricTags), new Max());
+
+            // add child
+            hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent);
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName,
+                "The average cache hit ratio.", metricTags), new Avg());
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName,
+                "The minimum cache hit ratio.", metricTags), new Min());
+            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName,
+                "The maximum cache hit ratio.", metricTags), new Max());
 
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 31fb3bb..1220c02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -77,6 +77,38 @@ public class ThreadCache {
     }
 
     /**
+     * The thread cache maintains a set of {@link NamedCache}s whose names are a concatenation of the task ID and the
+     * underlying store name. This method creates those names.
+     * @param taskIDString Task ID
+     * @param underlyingStoreName Underlying store name
+     * @return
+     */
+    public static String nameSpaceFromTaskIdAndStore(final String taskIDString, final String underlyingStoreName) {
+        return taskIDString + "-" + underlyingStoreName;
+    }
+
+    /**
+     * Given a cache name of the form taskid-storename, return the task ID.
+     * @param cacheName
+     * @return
+     */
+    public static String taskIDfromCacheName(final String cacheName) {
+        String[] tokens = cacheName.split("-");
+        return tokens[0];
+    }
+
+    /**
+     * Given a cache name of the form taskid-storename, return the store name.
+     * @param cacheName
+     * @return
+     */
+    public static String underlyingStoreNamefromCacheName(final String cacheName) {
+        String[] tokens = cacheName.split("-");
+        return tokens[1];
+    }
+
+
+    /**
      * Add a listener that is called each time an entry is evicted from the cache or an explicit flush is called
      *
      * @param namespace

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 1db4cef..ab29c5c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.LinkedHashMap;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -90,6 +91,17 @@ public class ProcessorNodeTest {
         }
     }
 
+    private void testSpecificMetrics(final Metrics metrics, final String groupName,
+                                     final String opName,
+                                     final Map<String, String> metricTags) {
+        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-avg", groupName,
+                "The average latency of " + opName + " operation.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-max", groupName,
+                "The max latency of " + opName + " operation.", metricTags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-rate", groupName,
+                "The average number of occurrence of " + opName + " operation per second.", metricTags)));
+
+    }
     @Test
     public void testMetrics() {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
@@ -99,31 +111,34 @@ public class ProcessorNodeTest {
         node.init(context);
 
         Metrics metrics = context.baseMetrics();
-        String name = "task." + context.taskId() + "." + node.name();
-        String[] entities = {"all", name};
+        String name = "task." + context.taskId();
         String[] latencyOperations = {"process", "punctuate", "create", "destroy"};
         String throughputOperation =  "forward";
         String groupName = "stream-processor-node-metrics";
-        Map<String, String> tags = Collections.singletonMap("processor-node-id", node.name());
+        final Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put("processor-node-id", node.name());
+        metricTags.put("task-id", context.taskId().toString());
+
 
         for (String operation : latencyOperations) {
             assertNotNull(metrics.getSensor(operation));
-            assertNotNull(metrics.getSensor(name + "-" + operation));
         }
         assertNotNull(metrics.getSensor(throughputOperation));
 
-        for (String entity : entities) {
-            for (String operation : latencyOperations) {
-                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-avg", groupName,
-                    "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
-                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-max", groupName,
-                    "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
-                assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-rate", groupName,
-                    "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags)));
-            }
-            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + throughputOperation + "-rate", groupName,
-                "The average number of occurrence of " + entity + " " + throughputOperation + " operation per second.", tags)));
+        for (String opName : latencyOperations) {
+            testSpecificMetrics(metrics, groupName, opName, metricTags);
         }
+        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
+                "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+
+        // test "all"
+        metricTags.put("processor-node-id", "all");
+        for (String opName : latencyOperations) {
+            testSpecificMetrics(metrics, groupName, opName, metricTags);
+        }
+        assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
+                "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
+
 
         context.close();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e25da88..f3222d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -64,6 +64,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -219,26 +220,30 @@ public class StreamTaskTest {
         assertEquals(3, source2.numReceived);
     }
 
+
+    private void testSpecificMetrics(final String operation, final String groupName, final Map<String, String> tags) {
+        assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-avg", groupName,
+                "The average latency of " + operation + " operation.", tags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-max", groupName,
+                "The max latency of " + operation + " operation.", tags)));
+        assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-rate", groupName,
+                "The average number of occurrence of " + operation + " operation per second.", tags)));
+    }
+
+
     @Test
     public void testMetrics() throws Exception {
         final String name = task.id().toString();
-        final String[] entities = {"all", name};
+        final Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put("task-id", name);
         final String operation = "commit";
 
         final String groupName = "stream-task-metrics";
-        final Map<String, String> tags = Collections.singletonMap("streams-task-id", name);
 
         assertNotNull(metrics.getSensor(operation));
-        assertNotNull(metrics.getSensor(name + "-" + operation));
-
-        for (final String entity : entities) {
-            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-avg", groupName,
-                "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
-            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-max", groupName,
-                "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags)));
-            assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-rate", groupName,
-                "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags)));
-        }
+        testSpecificMetrics(operation, groupName, metricTags);
+        metricTags.put("task-id", "all");
+        testSpecificMetrics(operation, groupName, metricTags);
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 89a4d63..5181c8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -31,7 +31,7 @@ import static org.junit.Assert.assertFalse;
 
 public class MergedSortedCacheKeyValueStoreIteratorTest {
 
-    private final String namespace = "one";
+    private final String namespace = "0.0-one";
     private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray());
     private KeyValueStore<Bytes, byte[]> store;
     private ThreadCache cache;
@@ -148,7 +148,6 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
         final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
         final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
         byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        final String namespace = "one";
         for (int i = 0; i < bytes.length - 1; i += 2) {
             kv.put(Bytes.wrap(bytes[i]), bytes[i]);
             cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1]));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index 2088fbe..dd0d8a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -45,7 +45,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
 
     private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>();
     private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  new MockStreamsMetrics(new Metrics()));
-    private final String namespace = "one";
+    private final String namespace = "0.0-one";
     private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String());
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 59297b5..7854191 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -42,11 +42,12 @@ public class NamedCacheTest {
 
     private NamedCache cache;
     private MockStreamsMetrics streamMetrics;
-
+    private final String taskIDString = "0.0";
+    private final String underlyingStoreName = "storeName";
     @Before
     public void setUp() throws Exception {
         streamMetrics = new MockStreamsMetrics(new Metrics());
-        cache = new NamedCache("name", streamMetrics);
+        cache = new NamedCache(taskIDString + "-" + underlyingStoreName, streamMetrics);
     }
 
     @Test
@@ -72,28 +73,33 @@ public class NamedCacheTest {
         }
     }
 
+    private void testSpecificMetrics(final String groupName, final String entityName, final String opName,
+                                     final Map<String, String> metricTags) {
+        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg",
+                groupName, "The average cache hit ratio of " + entityName, metricTags)));
+        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min",
+                groupName, "The minimum cache hit ratio of " + entityName, metricTags)));
+        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max",
+                groupName, "The maximum cache hit ratio of " + entityName, metricTags)));
+    }
     @Test
     public void testMetrics() throws Exception {
         final String scope = "record-cache";
         final String entityName = cache.name();
         final String opName = "hitRatio";
         final String tagKey = "record-cache-id";
-        final String tagValue = cache.name();
+        final String tagValue = underlyingStoreName;
         final String groupName = "stream-" + scope + "-metrics";
         final Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put(tagKey, tagValue);
+        metricTags.put("task-id", taskIDString);
 
-        assertNotNull(streamMetrics.registry().getSensor(entityName + "-" + opName));
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName +
-            "-" + opName + "-avg", groupName, "The current count of " + entityName + " " + opName +
-            " operation.", metricTags)));
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName +
-            "-" + opName + "-min", groupName, "The current count of " + entityName + " " + opName +
-            " operation.", metricTags)));
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName +
-            "-" + opName + "-max", groupName, "The current count of " + entityName + " " + opName +
-            " operation.", metricTags)));
+        assertNotNull(streamMetrics.registry().getSensor(opName));
+        testSpecificMetrics(groupName, entityName, opName, metricTags);
 
+        // test "all"
+        metricTags.put(tagKey, "all");
+        testSpecificMetrics(groupName, entityName, opName, metricTags);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bee1e9e/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 43a03f1..2c08871 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -37,6 +37,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ThreadCacheTest {
+    final String namespace = "0.0-namespace";
+    final String namespace1 = "0.1-namespace";
+    final String namespace2 = "0.2-namespace";
 
     @Test
     public void basicPutGet() throws IOException {
@@ -47,7 +50,6 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        final String name = "name";
         ThreadCache cache = new ThreadCache("testCache",
                 toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
             new MockStreamsMetrics(new Metrics()));
@@ -55,12 +57,12 @@ public class ThreadCacheTest {
         for (KeyValue<String, String> kvToInsert : toInsert) {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             byte[] value = kvToInsert.value.getBytes();
-            cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
         }
 
         for (KeyValue<String, String> kvToInsert : toInsert) {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
-            LRUCacheEntry entry = cache.get(name, key);
+            LRUCacheEntry entry = cache.get(namespace, key);
             assertEquals(entry.isDirty(), true);
             assertEquals(new String(entry.value), kvToInsert.value);
         }
@@ -73,7 +75,6 @@ public class ThreadCacheTest {
     private void checkOverheads(double entryFactor, double systemFactor, long desiredCacheSize, int keySizeBytes,
                             int valueSizeBytes) {
         Runtime runtime = Runtime.getRuntime();
-        final String name = "name";
         long numElements = desiredCacheSize / memoryCacheEntrySize(new byte[keySizeBytes], new byte[valueSizeBytes], "");
 
         System.gc();
@@ -86,7 +87,7 @@ public class ThreadCacheTest {
             String keyStr = "K" + i;
             Bytes key = Bytes.wrap(keyStr.getBytes());
             byte[] value = new byte[valueSizeBytes];
-            cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
+            cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
         }
 
 
@@ -152,7 +153,6 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        final String namespace = "kafka";
         ThreadCache cache = new ThreadCache("testCache",
                 memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
             new MockStreamsMetrics(new Metrics()));
@@ -185,9 +185,9 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{0});
 
-        cache.put("name", key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete("name", key).value);
-        assertNull(cache.get("name", key));
+        cache.put(namespace, key, dirtyEntry(key.get()));
+        assertEquals(key.get(), cache.delete(namespace, key).value);
+        assertNull(cache.get(namespace, key));
     }
 
     @Test
@@ -195,7 +195,6 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final String namespace = "namespace";
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -216,14 +215,14 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
 
-        cache.put("name", key, dirtyEntry(key.get()));
-        assertNull(cache.delete("name", Bytes.wrap(new byte[]{1})));
+        cache.put(namespace, key, dirtyEntry(key.get()));
+        assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
-        assertNull(cache.delete("name", Bytes.wrap(new byte[]{1})));
+        assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
     }
 
     @Test
@@ -231,18 +230,17 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final Bytes nameByte = Bytes.wrap(new byte[]{0});
         final Bytes name1Byte = Bytes.wrap(new byte[]{1});
-        cache.put("name", nameByte, dirtyEntry(nameByte.get()));
-        cache.put("name1", nameByte, dirtyEntry(name1Byte.get()));
+        cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
+        cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get()));
 
-        assertArrayEquals(nameByte.get(), cache.get("name", nameByte).value);
-        assertArrayEquals(name1Byte.get(), cache.get("name1", nameByte).value);
+        assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value);
+        assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value);
     }
 
     @Test
     public void shouldPeekNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
-        final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
         assertEquals(theByte, iterator.peekNextKey());
@@ -253,7 +251,6 @@ public class ThreadCacheTest {
     public void shouldGetSameKeyAsPeekNext() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
-        final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
         assertEquals(iterator.peekNextKey(), iterator.next().key);
@@ -262,14 +259,14 @@ public class ThreadCacheTest {
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowIfNoPeekNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
         iterator.peekNextKey();
     }
 
     @Test
     public void shouldReturnFalseIfNoNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
         assertFalse(iterator.hasNext());
     }
 
@@ -277,7 +274,6 @@ public class ThreadCacheTest {
     public void shouldPeekAndIterateOverRange() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        final String namespace = "streams";
         for (final byte[] aByte : bytes) {
             cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
         }
@@ -295,7 +291,6 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() throws Exception {
-        final String namespace = "streams";
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
         final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@@ -321,7 +316,7 @@ public class ThreadCacheTest {
     public void shouldFlushDirtyEntriesForNamespace() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() {
+        cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                 for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
@@ -331,11 +326,11 @@ public class ThreadCacheTest {
         });
         final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
         for (byte[] bytes : expected) {
-            cache.put("1", Bytes.wrap(bytes), dirtyEntry(bytes));
+            cache.put(namespace1, Bytes.wrap(bytes), dirtyEntry(bytes));
         }
-        cache.put("2", Bytes.wrap(new byte[]{4}), dirtyEntry(new byte[]{4}));
+        cache.put(namespace2, Bytes.wrap(new byte[]{4}), dirtyEntry(new byte[]{4}));
 
-        cache.flush("1");
+        cache.flush(namespace1);
         assertEquals(expected, received);
     }
 
@@ -343,7 +338,7 @@ public class ThreadCacheTest {
     public void shouldNotFlushCleanEntriesForNamespace() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() {
+        cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                 for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
@@ -353,18 +348,18 @@ public class ThreadCacheTest {
         });
         final List<byte[]> toInsert =  Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
         for (byte[] bytes : toInsert) {
-            cache.put("1", Bytes.wrap(bytes), cleanEntry(bytes));
+            cache.put(namespace1, Bytes.wrap(bytes), cleanEntry(bytes));
         }
-        cache.put("2", Bytes.wrap(new byte[]{4}), cleanEntry(new byte[]{4}));
+        cache.put(namespace2, Bytes.wrap(new byte[]{4}), cleanEntry(new byte[]{4}));
 
-        cache.flush("1");
+        cache.flush(namespace1);
         assertEquals(Collections.EMPTY_LIST, received);
     }
 
 
     private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final String namespace = "namespace";
+
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -394,7 +389,6 @@ public class ThreadCacheTest {
     @Test
     public void shouldEvictAfterPutAll() throws Exception {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final String namespace = "namespace";
         final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -414,24 +408,24 @@ public class ThreadCacheTest {
     public void shouldPutAll() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()));
 
-        cache.putAll("name", Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
+        cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
                                            KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
 
-        assertArrayEquals(new byte[]{5}, cache.get("name", Bytes.wrap(new byte[]{0})).value);
-        assertArrayEquals(new byte[]{6}, cache.get("name", Bytes.wrap(new byte[]{1})).value);
+        assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value);
+        assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value);
     }
 
     @Test
     public void shouldNotForwardCleanEntryOnEviction() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() {
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                 received.addAll(dirty);
             }
         });
-        cache.put("name", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
+        cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
         assertEquals(0, received.size());
     }
     @Test
@@ -439,15 +433,14 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
-        assertNull(cache.putIfAbsent("n", key, dirtyEntry(value)));
-        assertArrayEquals(value, cache.putIfAbsent("n", key, dirtyEntry(new byte[]{8})).value);
-        assertArrayEquals(value, cache.get("n", key).value);
+        assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
+        assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value);
+        assertArrayEquals(value, cache.get(namespace, key).value);
     }
 
     @Test
     public void shouldEvictAfterPutIfAbsent() throws Exception {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final String namespace = "namespace";
         final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -469,51 +462,51 @@ public class ThreadCacheTest {
         final int maxCacheSizeInBytes = 100;
         final ThreadCache threadCache = new ThreadCache("testCache", maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
-        threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() {
+        threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                 // put an item into an empty cache when the total cache size
                 // is already > than maxCacheSizeBytes
-                threadCache.put("other", Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
+                threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
             }
         });
-        threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener() {
+        threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                //
             }
         });
-        threadCache.addDirtyEntryFlushListener("another", new ThreadCache.DirtyEntryFlushListener() {
+        threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
 
             }
         });
 
-        threadCache.put("another", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
-        threadCache.put("name", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
+        threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
+        threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
         // Put a large item such that when the eldest item is removed
         // cache sizeInBytes() > maxCacheSizeBytes
         int remaining = (int) (maxCacheSizeInBytes - threadCache.sizeBytes());
-        threadCache.put("name", Bytes.wrap(new byte[]{2}), dirtyEntry(new byte[remaining + 100]));
+        threadCache.put(namespace, Bytes.wrap(new byte[]{2}), dirtyEntry(new byte[remaining + 100]));
     }
 
     @Test
     public void shouldCleanupNamedCacheOnClose() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()));
-        cache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
-        cache.put("two", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
+        cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
+        cache.put(namespace2, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         assertEquals(cache.size(), 2);
-        cache.close("two");
+        cache.close(namespace2);
         assertEquals(cache.size(), 1);
-        assertNull(cache.get("two", Bytes.wrap(new byte[]{1})));
+        assertNull(cache.get(namespace2, Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldReturnNullIfKeyIsNull() throws Exception {
         final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new Metrics()));
-        threadCache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
-        assertNull(threadCache.get("one", null));
+        threadCache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
+        assertNull(threadCache.get(namespace, null));
     }
 
     private LRUCacheEntry dirtyEntry(final byte[] key) {


Mime
View raw message