kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5597: Autogenerate producer sender metrics
Date Wed, 06 Sep 2017 00:39:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cd59976ee -> 2fb5664bf


KAFKA-5597: Autogenerate producer sender metrics

Subtask of https://issues.apache.org/jira/browse/KAFKA-3480

The changes are very similar to what was done for the consumer in https://issues.apache.org/jira/browse/KAFKA-5191
(pull request https://github.com/apache/kafka/pull/2993)

Author: James Cheng <jylcheng@yahoo.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3535 from wushujames/producer_sender_metrics_docs

Fix one minor naming bug


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fb5664b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fb5664b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fb5664b

Branch: refs/heads/trunk
Commit: 2fb5664bf4591f3e7bdc02894b9de392bf72913c
Parents: cd59976
Author: James Cheng <jylcheng@yahoo.com>
Authored: Tue Sep 5 17:36:53 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 5 17:38:58 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   9 +-
 checkstyle/suppressions.xml                     |   2 +
 .../kafka/clients/producer/KafkaProducer.java   |   5 +-
 .../producer/internals/ProducerMetrics.java     |  48 +++++++
 .../clients/producer/internals/Sender.java      |  66 +++++-----
 .../internals/SenderMetricsRegistry.java        | 125 +++++++++++++++++++
 .../clients/producer/internals/SenderTest.java  |  29 +++--
 .../internals/TransactionManagerTest.java       |   2 +-
 docs/ops.html                                   | 114 +----------------
 9 files changed, 240 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index c56fe82..62e9f08 100644
--- a/build.gradle
+++ b/build.gradle
@@ -666,11 +666,18 @@ project(':core') {
     standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream()
   }
 
+  task genProducerMetricsDocs(type: JavaExec) {
+    classpath = sourceSets.test.runtimeClasspath
+    main = 'org.apache.kafka.clients.producer.internals.ProducerMetrics'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, "producer_metrics.html").newOutputStream()
+  }
+
   task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
                                'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs',
                                'genKafkaConfigDocs', 'genTopicConfigDocs',
                                ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
-                               ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs'],
type: Tar) {
+                               ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs',
'genProducerMetricsDocs'], type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("$rootDir/docs")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 990e366..027d07f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -32,6 +32,8 @@
     <suppress checks="ParameterNumber"
               files="Fetcher.java"/>
     <suppress checks="ParameterNumber"
+              files="Sender.java"/>
+    <suppress checks="ParameterNumber"
               files="ConfigDef.java"/>
     <suppress checks="ParameterNumber"
               files="DefaultRecordBatch.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3fa007a..8766107 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.ProducerMetrics;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
@@ -328,6 +329,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                     MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
+            ProducerMetrics metricsRegistry = new ProducerMetrics(metricTags.keySet(), "producer");
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
@@ -380,7 +382,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(),
time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
-            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
+            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, metricsRegistry.senderMetrics);
             NetworkClient client = new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                             this.metrics, time, "producer", channelBuilder),
@@ -405,6 +407,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                     acks,
                     retries,
                     this.metrics,
+                    metricsRegistry.senderMetrics,
                     Time.SYSTEM,
                     this.requestTimeoutMs,
                     config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
new file mode 100644
index 0000000..6b8487e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Metrics;
+
+public class ProducerMetrics {
+
+    public SenderMetricsRegistry senderMetrics;
+
+    public ProducerMetrics(Set<String> tags, String metricGrpPrefix) {
+        this.senderMetrics = new SenderMetricsRegistry(tags);
+    }
+
+    private List<MetricNameTemplate> getAllTemplates() {
+        List<MetricNameTemplate> l = new ArrayList<>();
+        l.addAll(this.senderMetrics.getAllTemplates());
+        return l;
+    }
+
+    public static void main(String[] args) {
+        Set<String> tags = new HashSet<>();
+        tags.add("client-id");
+        ProducerMetrics metrics = new ProducerMetrics(tags, "producer");
+        System.out.println(Metrics.toHtmlTable("kafka.producer", metrics.getAllTemplates()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9c3b4d2..bf3714e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -129,6 +129,7 @@ public class Sender implements Runnable {
                   short acks,
                   int retries,
                   Metrics metrics,
+                  SenderMetricsRegistry metricsRegistry,
                   Time time,
                   int requestTimeout,
                   long retryBackoffMs,
@@ -144,7 +145,7 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.sensors = new SenderMetrics(metrics);
+        this.sensors = new SenderMetrics(metrics, metricsRegistry);
         this.requestTimeout = requestTimeout;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
@@ -676,13 +677,12 @@ public class Sender implements Runnable {
         this.client.wakeup();
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics) {
-        String metricGrpName = SenderMetrics.METRIC_GROUP_NAME;
+    public static Sensor throttleTimeSensor(Metrics metrics, SenderMetricsRegistry metricsRegistry)
{
         Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
-        produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg",
-                metricGrpName, "The average throttle time in ms"), new Avg());
-        produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max",
-                metricGrpName, "The maximum throttle time in ms"), new Max());
+        MetricName m = metrics.metricInstance(metricsRegistry.produceThrottleTimeAvg);
+        produceThrottleTimeSensor.add(m, new Avg());
+        m = metrics.metricInstance(metricsRegistry.produceThrottleTimeMax);
+        produceThrottleTimeSensor.add(m, new Max());
         return produceThrottleTimeSensor;
     }
 
@@ -690,8 +690,6 @@ public class Sender implements Runnable {
      * A collection of sensors for the sender
      */
     private class SenderMetrics {
-
-        private static final String METRIC_GROUP_NAME = "producer-metrics";
         private final Metrics metrics;
         public final Sensor retrySensor;
         public final Sensor errorSensor;
@@ -702,60 +700,61 @@ public class Sender implements Runnable {
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
         public final Sensor batchSplitSensor;
+        private SenderMetricsRegistry metricsRegistry;
 
-        public SenderMetrics(Metrics metrics) {
+        public SenderMetrics(Metrics metrics, SenderMetricsRegistry metricsRegistry) {
             this.metrics = metrics;
-            String metricGrpName = METRIC_GROUP_NAME;
+            this.metricsRegistry = metricsRegistry;
 
             this.batchSizeSensor = metrics.sensor("batch-size");
-            MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average
number of bytes sent per partition per-request.");
+            MetricName m = metrics.metricInstance(metricsRegistry.batchSizeAvg);
             this.batchSizeSensor.add(m, new Avg());
-            m = metrics.metricName("batch-size-max", metricGrpName, "The max number of bytes
sent per partition per-request.");
+            m = metrics.metricInstance(metricsRegistry.batchSizeMax);
             this.batchSizeSensor.add(m, new Max());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
-            m = metrics.metricName("compression-rate-avg", metricGrpName, "The average compression
rate of record batches.");
+            m = metrics.metricInstance(metricsRegistry.compressionRateAvg);
             this.compressionRateSensor.add(m, new Avg());
 
             this.queueTimeSensor = metrics.sensor("queue-time");
-            m = metrics.metricName("record-queue-time-avg", metricGrpName, "The average time
in ms record batches spent in the record accumulator.");
+            m = metrics.metricInstance(metricsRegistry.recordQueueTimeAvg);
             this.queueTimeSensor.add(m, new Avg());
-            m = metrics.metricName("record-queue-time-max", metricGrpName, "The maximum time
in ms record batches spent in the record accumulator.");
+            m = metrics.metricInstance(metricsRegistry.recordQueueTimeMax);
             this.queueTimeSensor.add(m, new Max());
 
             this.requestTimeSensor = metrics.sensor("request-time");
-            m = metrics.metricName("request-latency-avg", metricGrpName, "The average request
latency in ms");
+            m = metrics.metricInstance(metricsRegistry.requestLatencyAvg);
             this.requestTimeSensor.add(m, new Avg());
-            m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request
latency in ms");
+            m = metrics.metricInstance(metricsRegistry.requestLatencyMax);
             this.requestTimeSensor.add(m, new Max());
 
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            m = metrics.metricName("record-send-rate", metricGrpName, "The average number
of records sent per second.");
+            m = metrics.metricInstance(metricsRegistry.recordSendRate);
             this.recordsPerRequestSensor.add(m, new Rate());
-            m = metrics.metricName("records-per-request-avg", metricGrpName, "The average
number of records per request.");
+            m = metrics.metricInstance(metricsRegistry.recordsPerRequestAvg);
             this.recordsPerRequestSensor.add(m, new Avg());
 
             this.retrySensor = metrics.sensor("record-retries");
-            m = metrics.metricName("record-retry-rate", metricGrpName, "The average per-second
number of retried record sends");
+            m = metrics.metricInstance(metricsRegistry.recordRetryRate);
             this.retrySensor.add(m, new Rate());
 
             this.errorSensor = metrics.sensor("errors");
-            m = metrics.metricName("record-error-rate", metricGrpName, "The average per-second
number of record sends that resulted in errors");
+            m = metrics.metricInstance(metricsRegistry.recordErrorRate);
             this.errorSensor.add(m, new Rate());
 
-            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
-            m = metrics.metricName("record-size-max", metricGrpName, "The maximum record
size");
+            this.maxRecordSizeSensor = metrics.sensor("record-size");
+            m = metrics.metricInstance(metricsRegistry.recordSizeMax);
             this.maxRecordSizeSensor.add(m, new Max());
-            m = metrics.metricName("record-size-avg", metricGrpName, "The average record
size");
+            m = metrics.metricInstance(metricsRegistry.recordSizeAvg);
             this.maxRecordSizeSensor.add(m, new Avg());
 
-            m = metrics.metricName("requests-in-flight", metricGrpName, "The current number
of in-flight requests awaiting a response.");
+            m = metrics.metricInstance(metricsRegistry.requestsInFlight);
             this.metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return client.inFlightRequestCount();
                 }
             });
-            m = metrics.metricName("metadata-age", metricGrpName, "The age in seconds of
the current producer metadata being used.");
+            m = metrics.metricInstance(metricsRegistry.metadataAge);
             metrics.addMetric(m, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
@@ -763,7 +762,7 @@ public class Sender implements Runnable {
             });
 
             this.batchSplitSensor = metrics.sensor("batch-split-rate");
-            m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record
batch split");
+            m = metrics.metricInstance(metricsRegistry.batchSplitRate);
             this.batchSplitSensor.add(m, new Rate());
         }
 
@@ -774,30 +773,29 @@ public class Sender implements Runnable {
             Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
             if (topicRecordCount == null) {
                 Map<String, String> metricTags = Collections.singletonMap("topic",
topic);
-                String metricGrpName = "producer-topic-metrics";
 
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                MetricName m = this.metrics.metricName("record-send-rate", metricGrpName,
metricTags);
+                MetricName m = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate,
metricTags);
                 topicRecordCount.add(m, new Rate());
 
                 String topicByteRateName = "topic." + topic + ".bytes";
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
-                m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
+                m = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags);
                 topicByteRate.add(m, new Rate());
 
                 String topicCompressionRateName = "topic." + topic + ".compression-rate";
                 Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
-                m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
+                m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags);
                 topicCompressionRate.add(m, new Avg());
 
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
-                m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
+                m = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags);
                 topicRetrySensor.add(m, new Rate());
 
                 String topicErrorName = "topic." + topic + ".record-errors";
                 Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
-                m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
+                m = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags);
                 topicErrorSensor.add(m, new Rate());
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
new file mode 100644
index 0000000..f29d319
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+public class SenderMetricsRegistry {
+
+    final static String METRIC_GROUP_NAME = "producer-metrics";
+    final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
+
+    public MetricNameTemplate batchSizeAvg;
+    public MetricNameTemplate batchSizeMax;
+    public MetricNameTemplate compressionRateAvg;
+    public MetricNameTemplate recordQueueTimeAvg;
+    public MetricNameTemplate recordQueueTimeMax;
+    public MetricNameTemplate requestLatencyAvg;
+    public MetricNameTemplate requestLatencyMax;
+    public MetricNameTemplate produceThrottleTimeAvg;
+    public MetricNameTemplate produceThrottleTimeMax;
+    public MetricNameTemplate recordSendRate;
+    public MetricNameTemplate recordsPerRequestAvg;
+    public MetricNameTemplate recordRetryRate;
+    public MetricNameTemplate recordErrorRate;
+    public MetricNameTemplate recordSizeMax;
+    public MetricNameTemplate recordSizeAvg;
+    public MetricNameTemplate requestsInFlight;
+    public MetricNameTemplate metadataAge;
+    public MetricNameTemplate topicRecordSendRate;
+    public MetricNameTemplate topicByteRate;
+    public MetricNameTemplate topicCompressionRate;
+    public MetricNameTemplate topicRecordRetryRate;
+    public MetricNameTemplate topicRecordErrorRate;
+    public MetricNameTemplate batchSplitRate;
+
+    public SenderMetricsRegistry() {
+        this(new HashSet<String>());
+    }
+
+    public SenderMetricsRegistry(Set<String> tags) {
+
+        /* ***** Client level *****/
+        
+        this.batchSizeAvg = new MetricNameTemplate("batch-size-avg", METRIC_GROUP_NAME, "The
average number of bytes sent per partition per-request.", tags);
+        this.batchSizeMax = new MetricNameTemplate("batch-size-max", METRIC_GROUP_NAME, "The
max number of bytes sent per partition per-request.", tags);
+        this.compressionRateAvg = new MetricNameTemplate("compression-rate-avg", METRIC_GROUP_NAME,
"The average compression rate of record batches.", tags);
+        this.recordQueueTimeAvg = new MetricNameTemplate("record-queue-time-avg", METRIC_GROUP_NAME,
"The average time in ms record batches spent in the send buffer.", tags);
+        this.recordQueueTimeMax = new MetricNameTemplate("record-queue-time-max", METRIC_GROUP_NAME,
"The maximum time in ms record batches spent in the send buffer.", tags);
+        this.requestLatencyAvg = new MetricNameTemplate("request-latency-avg", METRIC_GROUP_NAME,
"The average request latency in ms", tags);
+        this.requestLatencyMax = new MetricNameTemplate("request-latency-max", METRIC_GROUP_NAME,
"The maximum request latency in ms", tags);
+        this.recordSendRate = new MetricNameTemplate("record-send-rate", METRIC_GROUP_NAME,
"The average number of records sent per second.", tags);
+        this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", METRIC_GROUP_NAME,
"The average number of records per request.", tags);
+        this.recordRetryRate = new MetricNameTemplate("record-retry-rate", METRIC_GROUP_NAME,
"The average per-second number of retried record sends", tags);
+        this.recordErrorRate = new MetricNameTemplate("record-error-rate", METRIC_GROUP_NAME,
"The average per-second number of record sends that resulted in errors", tags);
+        this.recordSizeMax = new MetricNameTemplate("record-size-max", METRIC_GROUP_NAME,
"The maximum record size", tags);
+        this.recordSizeAvg = new MetricNameTemplate("record-size-avg", METRIC_GROUP_NAME,
"The average record size", tags);
+        this.requestsInFlight = new MetricNameTemplate("requests-in-flight", METRIC_GROUP_NAME,
"The current number of in-flight requests awaiting a response.", tags);
+        this.metadataAge = new MetricNameTemplate("metadata-age", METRIC_GROUP_NAME, "The
age in seconds of the current producer metadata being used.", tags);
+        this.batchSplitRate = new MetricNameTemplate("batch-split-rate", METRIC_GROUP_NAME,
"The average number of batch splits per second", tags);
+
+        this.produceThrottleTimeAvg = new MetricNameTemplate("produce-throttle-time-avg",
METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", tags);
+        this.produceThrottleTimeMax = new MetricNameTemplate("produce-throttle-time-max",
METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", tags);
+
+        /* ***** Topic level *****/
+        Set<String> topicTags = new HashSet<String>(tags);
+        topicTags.add("topic");
+
+        this.topicRecordSendRate = new MetricNameTemplate("record-send-rate", TOPIC_METRIC_GROUP_NAME,
"The average number of records sent per second for a topic.", topicTags);
+        this.topicByteRate = new MetricNameTemplate("byte-rate", TOPIC_METRIC_GROUP_NAME,
"The average number of bytes sent per second for a topic.", topicTags);
+        this.topicCompressionRate = new MetricNameTemplate("compression-rate", TOPIC_METRIC_GROUP_NAME,
"The average compression rate of record batches for a topic.", topicTags);
+        this.topicRecordRetryRate = new MetricNameTemplate("record-retry-rate", TOPIC_METRIC_GROUP_NAME,
"The average per-second number of retried record sends for a topic", topicTags);
+        this.topicRecordErrorRate = new MetricNameTemplate("record-error-rate", TOPIC_METRIC_GROUP_NAME,
"The average per-second number of record sends that resulted in errors for a topic", topicTags);
+
+    }
+
+    public List<MetricNameTemplate> getAllTemplates() {
+        return Arrays.asList(this.batchSizeAvg,
+                this.batchSizeMax,
+                this.compressionRateAvg,
+                this.recordQueueTimeAvg,
+                this.recordQueueTimeMax,
+                this.requestLatencyAvg,
+                this.requestLatencyMax,
+                this.recordSendRate,
+                this.recordsPerRequestAvg,
+                this.recordRetryRate,
+                this.recordErrorRate,
+                this.recordSizeMax,
+                this.recordSizeAvg,
+                this.requestsInFlight,
+                this.metadataAge,
+                this.batchSplitRate,
+                
+                this.produceThrottleTimeAvg,
+                this.produceThrottleTimeMax,
+
+                // per-topic metrics
+                this.topicRecordSendRate,
+                this.topicByteRate,
+                this.topicCompressionRate,
+                this.topicRecordRetryRate,
+                this.topicRecordErrorRate
+                );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e66cce0..096e7c1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -88,7 +88,6 @@ public class SenderTest {
     private static final short ACKS_ALL = -1;
     private static final int MAX_RETRIES = 0;
     private static final String CLIENT_ID = "clientId";
-    private static final String METRIC_GROUP = "producer-metrics";
     private static final double EPS = 0.0001;
     private static final int MAX_BLOCK_TIMEOUT = 1000;
     private static final int REQUEST_TIMEOUT = 1000;
@@ -104,6 +103,7 @@ public class SenderTest {
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
+    private SenderMetricsRegistry senderMetricsRegistry = null;
     private final LogContext loggerFactory = new LogContext();
 
     @Before
@@ -235,7 +235,7 @@ public class SenderTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
-        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
+        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, this.senderMetricsRegistry);
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@@ -263,8 +263,8 @@ public class SenderTest {
             selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg",
METRIC_GROUP, ""));
-        KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max",
METRIC_GROUP, ""));
+        KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeAvg));
+        KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeMax));
         // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
         assertEquals(250, avgMetric.value(), EPS);
         assertEquals(400, maxMetric.value(), EPS);
@@ -278,7 +278,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator,
false, MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50,
null, apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
@@ -325,7 +325,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator,
true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                    m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1
on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -577,7 +577,7 @@ public class SenderTest {
         int maxRetries = 10;
         Metrics m = new Metrics();
         Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager,
apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -617,8 +617,9 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
+        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
         Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -637,7 +638,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We
should check the metrics for errors.
         assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
 
-        KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP,
""));
+        KafkaMetric recordErrors = m.metrics().get(m.metricInstance(metricsRegistry.recordErrorRate));
         assertTrue("Expected non-zero value for record send errors", recordErrors.value()
> 0);
 
         assertTrue(responseFuture.isDone());
@@ -655,7 +656,7 @@ public class SenderTest {
         int maxRetries = 10;
         Metrics m = new Metrics();
         Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager,
apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -706,8 +707,9 @@ public class SenderTest {
         try (Metrics m = new Metrics()) {
             accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP,
0L, 0L, m, time,
                     new ApiVersions(), txnManager);
+            SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
             Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator,
true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                    m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
+                    m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1
on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -769,7 +771,7 @@ public class SenderTest {
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
 
             assertTrue("There should be a split",
-                    m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value()
> 0);
+                    m.metrics().get(m.metricInstance(metricsRegistry.batchSplitRate)).value()
> 0);
         }
     }
 
@@ -826,8 +828,9 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE,
0L, 0L, metrics, time,
                 apiVersions, transactionManager);
+        this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
         this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator,
true, MAX_REQUEST_SIZE, ACKS_ALL,
-                MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager,
apiVersions);
+                MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT,
50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 282d91b..1219b9c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -124,7 +124,7 @@ public class TransactionManagerTest {
         Metrics metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE,
0L, 0L, metrics, time, apiVersions, transactionManager);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator,
true, MAX_REQUEST_SIZE, ACKS_ALL,
-                MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager,
apiVersions);
+                MAX_RETRIES, metrics, new SenderMetricsRegistry(metricTags.keySet()), this.time,
REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
         client.setNode(brokerNode);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2fb5664b/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index b34aba4..c55b768 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1039,119 +1039,13 @@
         <td>The fraction of time an appender waits for space allocation.</td>
         <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
       </tr>
-      <tr>
-        <td>batch-size-avg</td>
-        <td>The average number of bytes sent per partition per-request.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>batch-size-max</td>
-        <td>The max number of bytes sent per partition per-request.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>compression-rate-avg</td>
-        <td>The average compression rate of record batches.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-queue-time-avg</td>
-        <td>The average time in ms record batches spent in the record accumulator.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-queue-time-max</td>
-        <td>The maximum time in ms record batches spent in the record accumulator.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>request-latency-avg</td>
-        <td>The average request latency in ms.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>request-latency-max</td>
-        <td>The maximum request latency in ms.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-send-rate</td>
-        <td>The average number of records sent per second.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>records-per-request-avg</td>
-        <td>The average number of records per request.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-retry-rate</td>
-        <td>The average per-second number of retried record sends.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-error-rate</td>
-        <td>The average per-second number of record sends that resulted in errors.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-size-max</td>
-        <td>The maximum record size.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-size-avg</td>
-        <td>The average record size.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>requests-in-flight</td>
-        <td>The current number of in-flight requests awaiting a response.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>metadata-age</td>
-        <td>The age in seconds of the current producer metadata being used.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>produce-throttle-time-max</td>
-        <td>The maximum time in ms a request was throttled by a broker.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>produce-throttle-time-avg</td>
-        <td>The average time in ms a request was throttled by a broker.</td>
-        <td>kafka.producer:type=producer-metrics,client-id=([-.\w]+)</td>
-      </tr>
 
-      <tr>
-        <td>record-send-rate</td>
-        <td>The average number of records sent per second for a topic.</td>
-        <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>byte-rate</td>
-        <td>The average number of bytes sent per second for a topic.</td>
-        <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>compression-rate</td>
-        <td>The average compression rate of record batches for a topic.</td>
-        <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-retry-rate</td>
-        <td>The average per-second number of retried record sends for a topic.</td>
-        <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
-      <tr>
-        <td>record-error-rate</td>
-        <td>The average per-second number of record sends that resulted in errors for
a topic.</td>
-        <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
-      </tr>
   </tbody></table>
 
+  <h5><a id="producer_sender_monitoring" href="#producer_sender_monitoring">Producer
Sender Metrics</a></h5>
+
+  <!--#include virtual="generated/producer_metrics.html" -->
+
 
   <h4><a id="new_consumer_monitoring" href="#new_consumer_monitoring">New consumer
monitoring</a></h4>
 


Mime
View raw message