kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mimai...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9175: Update MirrorMaker 2 topic/partition metrics (#7688)
Date Wed, 13 Nov 2019 20:53:33 GMT
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new fa22282  KAFKA-9175: Update MirrorMaker 2 topic/partition metrics (#7688)
fa22282 is described below

commit fa2228292dd5ce120f4a87d9d4250f2d8964958a
Author: Mickael Maison <mimaison@users.noreply.github.com>
AuthorDate: Wed Nov 13 20:36:25 2019 +0000

    KAFKA-9175: Update MirrorMaker 2 topic/partition metrics (#7688)
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>
    
    Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
    Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
---
 .../apache/kafka/connect/mirror/MirrorMetrics.java | 32 ++++++++++++----------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
index 51ddafc..ea9d2f7 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
@@ -20,12 +20,11 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.metrics.stats.Value;
-import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Arrays;
@@ -48,6 +47,9 @@ class MirrorMetrics implements AutoCloseable {
     private static final MetricNameTemplate RECORD_COUNT = new MetricNameTemplate(
             "record-count", SOURCE_CONNECTOR_GROUP,
             "Number of source records replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_RATE = new MetricNameTemplate(
+            "record-rate", SOURCE_CONNECTOR_GROUP,
+            "Average number of source records replicated to the target cluster per second.",
PARTITION_TAGS);
     private static final MetricNameTemplate RECORD_AGE = new MetricNameTemplate(
             "record-age-ms", SOURCE_CONNECTOR_GROUP,
             "The age of incoming source records when replicated to the target cluster.",
PARTITION_TAGS);
@@ -60,6 +62,9 @@ class MirrorMetrics implements AutoCloseable {
     private static final MetricNameTemplate RECORD_AGE_AVG = new MetricNameTemplate(
             "record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
             "The average age of incoming source records when replicated to the target cluster.",
PARTITION_TAGS);
+    private static final MetricNameTemplate BYTE_COUNT = new MetricNameTemplate(
+            "byte-count", SOURCE_CONNECTOR_GROUP,
+            "Number of bytes replicated to the target cluster.", PARTITION_TAGS);
     private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
             "byte-rate", SOURCE_CONNECTOR_GROUP,
             "Average number of bytes replicated per second.", PARTITION_TAGS);
@@ -134,7 +139,7 @@ class MirrorMetrics implements AutoCloseable {
     }
 
     void recordBytes(TopicPartition topicPartition, long bytes) {
-        partitionMetrics.get(topicPartition).byteRateSensor.record((double) bytes);
+        partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
     }
 
     void checkpointLatency(TopicPartition topicPartition, String group, long millis) {
@@ -152,39 +157,36 @@ class MirrorMetrics implements AutoCloseable {
 
     private class PartitionMetrics {
         private final Sensor recordSensor;
-        private final Sensor byteRateSensor;
+        private final Sensor byteSensor;
         private final Sensor recordAgeSensor;
         private final Sensor replicationLatencySensor;
-        private final TopicPartition topicPartition;
-     
+
         PartitionMetrics(TopicPartition topicPartition) {
-            this.topicPartition = topicPartition;
+            String prefix = topicPartition.topic() + "-" + topicPartition.partition() + "-";
 
             Map<String, String> tags = new LinkedHashMap<>();
             tags.put("target", target); 
             tags.put("topic", topicPartition.topic());
             tags.put("partition", Integer.toString(topicPartition.partition()));
 
-            recordSensor = metrics.sensor("record-count");
-            recordSensor.add(metrics.metricInstance(RECORD_COUNT, tags), new WindowedCount());
+            recordSensor = metrics.sensor(prefix + "records-sent");
+            recordSensor.add(new Meter(metrics.metricInstance(RECORD_RATE, tags), metrics.metricInstance(RECORD_COUNT,
tags)));
 
-            byteRateSensor = metrics.sensor("byte-rate");
-            byteRateSensor.add(metrics.metricInstance(BYTE_RATE, tags), new Rate());
+            byteSensor = metrics.sensor(prefix + "bytes-sent");
+            byteSensor.add(new Meter(metrics.metricInstance(BYTE_RATE, tags), metrics.metricInstance(BYTE_COUNT,
tags)));
 
-            recordAgeSensor = metrics.sensor("record-age");
+            recordAgeSensor = metrics.sensor(prefix + "record-age");
             recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new Value());
             recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags), new Max());
             recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags), new Min());
             recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags), new Avg());
 
-            replicationLatencySensor = metrics.sensor("replication-latency");
+            replicationLatencySensor = metrics.sensor(prefix + "replication-latency");
             replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags),
new Value());
             replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX,
tags), new Max());
             replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN,
tags), new Min());
             replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG,
tags), new Avg());
         }
-
-        
     }
 
     private class GroupMetrics {


Mime
View raw message