kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-2668; Add a metric that records the total number of metrics
Date Wed, 09 Dec 2015 03:43:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ee6b5e044 -> ef92a8ae7


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 90cd76f..e07e646 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.metrics;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.junit.Test;
@@ -29,11 +28,11 @@ public class JmxReporterTest {
         try {
             metrics.addReporter(new JmxReporter());
             Sensor sensor = metrics.sensor("kafka.requests");
-            sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
-            sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
+            sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
+            sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
             Sensor sensor2 = metrics.sensor("kafka.blah");
-            sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
-            sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
+            sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total());
+            sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total());
         } finally {
             metrics.close();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index d465c98..d7723ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -59,16 +59,16 @@ public class MetricsTest {
 
     @Test
     public void testMetricName() {
-        MetricName n1 = new MetricName("name", "group", "description", "key1", "value1", "key2", "value2");
+        MetricName n1 = metrics.metricName("name", "group", "description", "key1", "value1", "key2", "value2");
         Map<String, String> tags = new HashMap<String, String>();
         tags.put("key1", "value1");
         tags.put("key2", "value2");
-        MetricName n2 = new MetricName("name", "group", "description", tags);
+        MetricName n2 = metrics.metricName("name", "group", "description", tags);
         assertEquals("metric names created in two different ways should be equal", n1, n2);
 
         try {
-            new MetricName("name", "group", "description", "key1");
-            fail("Creating MetricName with an old number of keyValue should fail");
+            metrics.metricName("name", "group", "description", "key1");
+            fail("Creating MetricName with an odd number of keyValue should fail");
         } catch (IllegalArgumentException e) {
             // this is expected
         }
@@ -78,20 +78,20 @@ public class MetricsTest {
     public void testSimpleStats() throws Exception {
         ConstantMeasurable measurable = new ConstantMeasurable();
 
-        metrics.addMetric(new MetricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
+        metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
         Sensor s = metrics.sensor("test.sensor");
-        s.add(new MetricName("test.avg", "grp1"), new Avg());
-        s.add(new MetricName("test.max", "grp1"), new Max());
-        s.add(new MetricName("test.min", "grp1"), new Min());
-        s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
-        s.add(new MetricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
-        s.add(new MetricName("test.count", "grp1"), new Count());
+        s.add(metrics.metricName("test.avg", "grp1"), new Avg());
+        s.add(metrics.metricName("test.max", "grp1"), new Max());
+        s.add(metrics.metricName("test.min", "grp1"), new Min());
+        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+        s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
+        s.add(metrics.metricName("test.count", "grp1"), new Count());
         s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
-                             new Percentile(new MetricName("test.median", "grp1"), 50.0),
-                             new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9)));
+                             new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
+                             new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
 
         Sensor s2 = metrics.sensor("test.sensor2");
-        s2.add(new MetricName("s2.total", "grp1"), new Total());
+        s2.add(metrics.metricName("s2.total", "grp1"), new Total());
         s2.record(5.0);
 
         int sum = 0;
@@ -103,38 +103,38 @@ public class MetricsTest {
         // prior to any time passing
         double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
-                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
 
         // pretend 2 seconds passed...
         long sleepTimeMs = 2;
         time.sleep(sleepTimeMs * 1000);
         elapsedSecs += sleepTimeMs;
 
-        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS);
-        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS);
-        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
-        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS);
+        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
+        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
+        assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
+        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName("test.min", "grp1")).value(), EPS);
         assertEquals("Rate(0...9) = 1.40625",
-                     sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
+                     sum / elapsedSecs, metrics.metrics().get(metrics.metricName("test.rate", "grp1")).value(), EPS);
         assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs),
                      count / elapsedSecs,
-                     metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
+                     metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
         assertEquals("Count(0...9) = 10",
-                     (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
+                     (double) count, metrics.metrics().get(metrics.metricName("test.count", "grp1")).value(), EPS);
     }
 
     @Test
     public void testHierarchicalSensors() {
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(new MetricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
         Sensor child2 = metrics.sensor("test.child2", parent1);
-        child2.add(new MetricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
         Sensor grandchild = metrics.sensor("test.grandchild", child1);
-        grandchild.add(new MetricName("test.grandchild.count", "grp1"), new Count());
+        grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count());
 
         /* increment each sensor one time */
         parent1.record();
@@ -167,75 +167,76 @@ public class MetricsTest {
 
     @Test
     public void testRemoveSensor() {
+        int size = metrics.metrics().size();
         Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
+        parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
         Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
+        parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
         Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add(new MetricName("test.child1.count", "grp1"), new Count());
+        child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
         Sensor child2 = metrics.sensor("test.child2", parent2);
-        child2.add(new MetricName("test.child2.count", "grp1"), new Count());
+        child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
         Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
-        grandChild1.add(new MetricName("test.gchild2.count", "grp1"), new Count());
+        grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count());
 
         Sensor sensor = metrics.getSensor("test.parent1");
         assertNotNull(sensor);
         metrics.removeSensor("test.parent1");
         assertNull(metrics.getSensor("test.parent1"));
-        assertNull(metrics.metrics().get(new MetricName("test.parent1.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.parent1.count", "grp1")));
         assertNull(metrics.getSensor("test.child1"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.child1.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.child1.count", "grp1")));
 
         sensor = metrics.getSensor("test.gchild2");
         assertNotNull(sensor);
         metrics.removeSensor("test.gchild2");
         assertNull(metrics.getSensor("test.gchild2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.gchild2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.gchild2.count", "grp1")));
 
         sensor = metrics.getSensor("test.child2");
         assertNotNull(sensor);
         metrics.removeSensor("test.child2");
         assertNull(metrics.getSensor("test.child2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.child2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.child2.count", "grp1")));
 
         sensor = metrics.getSensor("test.parent2");
         assertNotNull(sensor);
         metrics.removeSensor("test.parent2");
         assertNull(metrics.getSensor("test.parent2"));
         assertNull(metrics.childrenSensors().get(sensor));
-        assertNull(metrics.metrics().get(new MetricName("test.parent2.count", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test.parent2.count", "grp1")));
 
-        assertEquals(0, metrics.metrics().size());
+        assertEquals(size, metrics.metrics().size());
     }
 
     @Test
     public void testRemoveInactiveMetrics() {
         Sensor s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
 
         Sensor s2 = metrics.sensor("test.s2", null, 3);
-        s2.add(new MetricName("test.s2.count", "grp1"), new Count());
+        s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
 
         Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
         purger.run();
         assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
         assertNotNull("MetricName test.s1.count must be present",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         time.sleep(1001);
         purger.run();
         assertNull("Sensor test.s1 should have been purged", metrics.getSensor("test.s1"));
         assertNull("MetricName test.s1.count should have been purged",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         // record a value in sensor s2. This should reset the clock for that sensor.
         // It should not get purged at the 3 second mark after creation
@@ -244,36 +245,37 @@ public class MetricsTest {
         purger.run();
         assertNotNull("Sensor test.s2 must be present", metrics.getSensor("test.s2"));
         assertNotNull("MetricName test.s2.count must be present",
-                metrics.metrics().get(new MetricName("test.s2.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s2.count", "grp1")));
 
         // After another 1 second sleep, the metric should be purged
         time.sleep(1000);
         purger.run();
         assertNull("Sensor test.s2 should have been purged", metrics.getSensor("test.s1"));
         assertNull("MetricName test.s2.count should have been purged",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
 
         // After purging, it should be possible to recreate a metric
         s1 = metrics.sensor("test.s1", null, 1);
-        s1.add(new MetricName("test.s1.count", "grp1"), new Count());
+        s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
         assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
         assertNotNull("MetricName test.s1.count must be present",
-                metrics.metrics().get(new MetricName("test.s1.count", "grp1")));
+                metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
     }
 
     @Test
     public void testRemoveMetric() {
-        metrics.addMetric(new MetricName("test1", "grp1"), new Count());
-        metrics.addMetric(new MetricName("test2", "grp1"), new Count());
+        int size = metrics.metrics().size();
+        metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
+        metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
 
-        assertNotNull(metrics.removeMetric(new MetricName("test1", "grp1")));
-        assertNull(metrics.metrics().get(new MetricName("test1", "grp1")));
-        assertNotNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+        assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
+        assertNotNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
 
-        assertNotNull(metrics.removeMetric(new MetricName("test2", "grp1")));
-        assertNull(metrics.metrics().get(new MetricName("test2", "grp1")));
+        assertNotNull(metrics.removeMetric(metrics.metricName("test2", "grp1")));
+        assertNull(metrics.metrics().get(metrics.metricName("test2", "grp1")));
 
-        assertEquals(0, metrics.metrics().size());
+        assertEquals(size, metrics.metrics().size());
     }
 
     @Test
@@ -313,15 +315,15 @@ public class MetricsTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testDuplicateMetricName() {
-        metrics.sensor("test").add(new MetricName("test", "grp1"), new Avg());
-        metrics.sensor("test2").add(new MetricName("test", "grp1"), new Total());
+        metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());
+        metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
     }
 
     @Test
     public void testQuotas() {
         Sensor sensor = metrics.sensor("test");
-        sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
-        sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
+        sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
+        sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
         sensor.record(5.0);
         try {
             sensor.record(1.0);
@@ -329,7 +331,7 @@ public class MetricsTest {
         } catch (QuotaViolationException e) {
             // this is good
         }
-        assertEquals(6.0, metrics.metrics().get(new MetricName("test1.total", "grp1")).value(), EPS);
+        assertEquals(6.0, metrics.metrics().get(metrics.metricName("test1.total", "grp1")).value(), EPS);
         sensor.record(-6.0);
         try {
             sensor.record(-1.0);
@@ -358,15 +360,15 @@ public class MetricsTest {
                                             0.0,
                                             100.0,
                                             BucketSizing.CONSTANT,
-                                            new Percentile(new MetricName("test.p25", "grp1"), 25),
-                                            new Percentile(new MetricName("test.p50", "grp1"), 50),
-                                            new Percentile(new MetricName("test.p75", "grp1"), 75));
+                                            new Percentile(metrics.metricName("test.p25", "grp1"), 25),
+                                            new Percentile(metrics.metricName("test.p50", "grp1"), 50),
+                                            new Percentile(metrics.metricName("test.p75", "grp1"), 75));
         MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
         Sensor sensor = metrics.sensor("test", config);
         sensor.add(percs);
-        Metric p25 = this.metrics.metrics().get(new MetricName("test.p25", "grp1"));
-        Metric p50 = this.metrics.metrics().get(new MetricName("test.p50", "grp1"));
-        Metric p75 = this.metrics.metrics().get(new MetricName("test.p75", "grp1"));
+        Metric p25 = this.metrics.metrics().get(metrics.metricName("test.p25", "grp1"));
+        Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
+        Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1"));
 
         // record two windows worth of sequential values
         for (int i = 0; i < buckets; i++)
@@ -389,7 +391,7 @@ public class MetricsTest {
         // Use the default time window. Set 3 samples
         MetricConfig cfg = new MetricConfig().samples(3);
         Sensor s = metrics.sensor("test.sensor", cfg);
-        s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
+        s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
 
         int sum = 0;
         int count = cfg.samples() - 1;
@@ -406,7 +408,7 @@ public class MetricsTest {
         // prior to any time passing
         double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
 
-        KafkaMetric km = metrics.metrics().get(new MetricName("test.rate", "grp1"));
+        KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
         assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
         assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
                 ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 18fd080..bce74e1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -54,7 +54,7 @@ public class SelectorTest {
         this.channelBuilder = new PlaintextChannelBuilder();
         this.channelBuilder.configure(configs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index a442ea0..bbc8fe1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -18,7 +18,6 @@ import static org.junit.Assert.assertTrue;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.io.File;
@@ -56,7 +55,7 @@ public class SslSelectorTest extends SelectorTest {
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder);
     }
 
     @After
@@ -84,7 +83,7 @@ public class SslSelectorTest extends SelectorTest {
             }
         };
         channelBuilder.configure(sslClientConfigs);
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder);
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder);
         try {
             int reqs = 500;
             String node = "0";

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 2b5d26b..34ea136 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,7 +74,7 @@ public class SslTransportLayerTest {
 
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
     }
 
     @After
@@ -452,7 +451,7 @@ public class SslTransportLayerTest {
 
         };
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
     }
     
     private static class CertStores {
@@ -560,7 +559,7 @@ public class SslTransportLayerTest {
             this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
             SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
             channelBuilder.configure(sslServerConfigs);
-            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
             setName("echoserver");
             setDaemon(true);
             acceptorThread = new AcceptorThread();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 5222cd0..64a0921 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -14,7 +14,6 @@ package org.apache.kafka.test;
 
 import java.util.Arrays;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -33,15 +32,15 @@ public class MetricsBench {
             Sensor parent = metrics.sensor("parent");
             Sensor child = metrics.sensor("child", parent);
             for (Sensor sensor : Arrays.asList(parent, child)) {
-                sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg());
-                sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count());
-                sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max());
+                sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"), new Avg());
+                sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new Count());
+                sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"), new Max());
                 sensor.add(new Percentiles(1024,
                         0.0,
                         iters,
                         BucketSizing.CONSTANT,
-                        new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0),
-                        new Percentile(new MetricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
+                        new Percentile(metrics.metricName(sensor.name() + ".median", "grp1"), 50.0),
+                        new Percentile(metrics.metricName(sensor.name() +  ".p_99", "grp1"), 99.0)));
             }
             long start = System.nanoTime();
             for (int i = 0; i < iters; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 6275636..79199a6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -67,7 +66,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              int heartbeatIntervalMs,
                              Metrics metrics,
                              String metricGrpPrefix,
-                             Map<String, String> metricTags,
                              Time time,
                              long retryBackoffMs,
                              String restUrl,
@@ -79,13 +77,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                 heartbeatIntervalMs,
                 metrics,
                 metricGrpPrefix,
-                metricTags,
                 time,
                 retryBackoffMs);
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;
-        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
+        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.listener = listener;
         this.rejoinRequested = false;
     }
@@ -254,7 +251,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         public final Metrics metrics;
         public final String metricGrpName;
 
-        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
+        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
@@ -270,16 +267,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                 }
             };
 
-            metrics.addMetric(new MetricName("assigned-connectors",
-                            this.metricGrpName,
-                            "The number of connector instances currently assigned to this consumer",
-                            tags),
-                    numConnectors);
-            metrics.addMetric(new MetricName("assigned-tasks",
-                            this.metricGrpName,
-                            "The number of tasks currently assigned to this consumer",
-                            tags),
-                    numTasks);
+            metrics.addMetric(metrics.metricName("assigned-connectors",
+                              this.metricGrpName,
+                              "The number of connector instances currently assigned to this consumer"), numConnectors);
+            metrics.addMetric(metrics.metricName("assigned-tasks",
+                              this.metricGrpName,
+                              "The number of tasks currently assigned to this consumer"), numTasks);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a36608a..4b24312 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -71,10 +71,13 @@ public class WorkerGroupMember {
         try {
             this.time = new SystemTime();
 
-            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
             String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+            Map<String, String> metricsTags = new LinkedHashMap<>();
+            metricsTags.put("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
@@ -83,11 +86,9 @@ public class WorkerGroupMember {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
             String metricGrpPrefix = "connect";
-            Map<String, String> metricsTags = new LinkedHashMap<>();
-            metricsTags.put("client-id", clientId);
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
+                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -102,7 +103,6 @@ public class WorkerGroupMember {
                     config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
                     metricGrpPrefix,
-                    metricsTags,
                     this.time,
                     retryBackoffMs,
                     restUrl,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index f47a9f9..3eab095 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -47,7 +47,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -75,7 +74,6 @@ public class WorkerCoordinatorTest {
     private Node node = cluster.nodes().get(0);
     private Metadata metadata;
     private Metrics metrics;
-    private Map<String, String> metricTags = new LinkedHashMap<>();
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
     @Mock private KafkaConfigStorage configStorage;
@@ -103,7 +101,6 @@ public class WorkerCoordinatorTest {
                 heartbeatIntervalMs,
                 metrics,
                 "consumer" + groupId,
-                metricTags,
                 time,
                 retryBackoffMs,
                 LEADER_URL,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 3a7e6de..a8d9964 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -226,7 +226,6 @@ object AdminClient {
       metrics,
       time,
       "admin",
-      Map[String, String](),
       channelBuilder)
 
     val networkClient = new NetworkClient(

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 2c5432e..6fa410a 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -111,6 +111,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
   private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
 
+  newGauge(
+    "yammer-metrics-count",
+    new Gauge[Int] {
+      def value = {
+        com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
+      }
+    }
+  )
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 69a9569..c3ecd75 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -70,7 +70,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val allMetricNames = (0 until totalProcessorThreads).map { i =>
     val tags = new util.HashMap[String, String]()
     tags.put("networkProcessor", i.toString)
-    new MetricName("io-wait-ratio", "socket-server-metrics", tags)
+    metrics.metricName("io-wait-ratio", "socket-server-metrics", tags)
   }
 
   /**
@@ -384,7 +384,7 @@ private[kafka] class Processor(val id: Int,
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
-        metrics.metrics().get(new MetricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
+        metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
       }
     },
     metricTags.asScala

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 82fec73..37b432c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -76,7 +76,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   throttledRequestReaper.start()
 
   private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue")
-  delayQueueSensor.add(new MetricName("queue-size",
+  delayQueueSensor.add(metrics.metricName("queue-size",
                                       apiKey,
                                       "Tracks the size of the delay queue"), new Total())
 
@@ -206,7 +206,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
           throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
                                               null,
                                               ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
-          throttleTimeSensor.add(new MetricName("throttle-time",
+          throttleTimeSensor.add(metrics.metricName("throttle-time",
                                                 apiKey,
                                                 "Tracking average throttle-time per client",
                                                 "client-id",
@@ -271,7 +271,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   private def clientRateMetricName(clientId: String): MetricName = {
-    new MetricName("byte-rate", apiKey,
+    metrics.metricName("byte-rate", apiKey,
                    "Tracking byte-rate per client",
                    "client-id", clientId)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9eedbe2..8120167 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -141,6 +141,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     }
   )
 
+  newGauge(
+    "yammer-metrics-count",
+    new Gauge[Int] {
+      def value = {
+        com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
+      }
+    }
+  )
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index d335b3e..bd7ca0e 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -90,9 +90,6 @@ object JmxTool extends Logging {
         List(null)
 
     val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
-    val allAttributes: Iterable[(ObjectName, Array[String])] =
-      names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
-
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       attributesWhitelistExists match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index cc1f821..23be120 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -125,10 +125,10 @@ class QuotasTest extends KafkaServerTestHarness {
     val numRecords = 1000
     produce(producers.head, numRecords)
 
-    val producerMetricName = new MetricName("throttle-time",
-                                    ApiKeys.PRODUCE.name,
-                                    "Tracking throttle-time per client",
-                                    "client-id", producerId1)
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId1)
     assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
 
     // Consumer should read in a bursty manner and get throttled immediately
@@ -136,10 +136,10 @@ class QuotasTest extends KafkaServerTestHarness {
     // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers.head.fetch(request)
-    val consumerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.FETCH.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", consumerId1)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId1)
     assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
   }
 
@@ -166,10 +166,10 @@ class QuotasTest extends KafkaServerTestHarness {
     val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala
     val numRecords = 1000
     produce(producers(1), numRecords)
-    val producerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.PRODUCE.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", producerId2)
+    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.PRODUCE.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", producerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
 
     // The "client" consumer does not get throttled.
@@ -177,10 +177,10 @@ class QuotasTest extends KafkaServerTestHarness {
     // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers(1).fetch(request)
-    val consumerMetricName = new MetricName("throttle-time",
-                                            ApiKeys.FETCH.name,
-                                            "Tracking throttle-time per client",
-                                            "client-id", consumerId2)
+    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
+                                                           ApiKeys.FETCH.name,
+                                                           "Tracking throttle-time per client",
+                                                           "client-id", consumerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index fadcd5a..68d6932 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -79,7 +79,7 @@ class ClientQuotaManagerTest {
   def testQuotaViolation() {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time)
-    val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", ""))
+    val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "producer", ""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92a8ae/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 4d1ef43..38333a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -680,30 +680,30 @@ public class StreamThread extends Thread {
             this.metricTags.put("client-id", clientId + "-" + getName());
 
             this.commitTimeSensor = metrics.sensor("commit-time");
-            this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
-            this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
-            this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
+            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
+            this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
+            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
 
             this.pollTimeSensor = metrics.sensor("poll-time");
-            this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
-            this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
-            this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
+            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
+            this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
+            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
 
             this.processTimeSensor = metrics.sensor("process-time");
-            this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
-            this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
-            this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
+            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
+            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
+            this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
 
             this.punctuateTimeSensor = metrics.sensor("punctuate-time");
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
+            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
 
             this.taskCreationSensor = metrics.sensor("task-creation");
-            this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
+            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
 
             this.taskDestructionSensor = metrics.sensor("task-destruction");
-            this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
         }
 
         @Override
@@ -733,11 +733,11 @@ public class StreamThread extends Thread {
         }
 
         private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-            maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-avg-latency-ms", metricGrpName,
                 "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-max-latency-ms", metricGrpName,
                 "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
-            maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(opName + "-qps", metricGrpName,
                 "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
         }
 


Mime
View raw message