kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Add metric templates for sender/fetcher rate totals
Date Mon, 18 Sep 2017 09:13:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f305dd68f -> a339a387e


MINOR: Add metric templates for sender/fetcher rate totals

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: James Cheng <jylcheng@yahoo.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3882 from rajinisivaram/MINOR-KAFKA-5738-metricstemplates


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a339a387
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a339a387
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a339a387

Branch: refs/heads/trunk
Commit: a339a387e0669226bc1537783b3d4c6ea5b9b4c0
Parents: f305dd6
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon Sep 18 10:13:28 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Sep 18 10:13:28 2017 +0100

----------------------------------------------------------------------
 .../internals/SenderMetricsRegistry.java        | 10 +++++-
 .../apache/kafka/common/MetricNameTemplate.java | 23 ++++++++++++-
 .../clients/consumer/internals/FetcherTest.java | 34 ++++++++++++++++++++
 .../clients/producer/internals/SenderTest.java  | 30 +++++++++++++++++
 .../java/org/apache/kafka/test/TestUtils.java   | 12 ++++++-
 5 files changed, 106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index 9e014a7..21466e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -117,24 +117,32 @@ public class SenderMetricsRegistry {
                 this.requestLatencyAvg,
                 this.requestLatencyMax,
                 this.recordSendRate,
+                this.recordSendTotal,
                 this.recordsPerRequestAvg,
                 this.recordRetryRate,
+                this.recordRetryTotal,
                 this.recordErrorRate,
+                this.recordErrorTotal,
                 this.recordSizeMax,
                 this.recordSizeAvg,
                 this.requestsInFlight,
                 this.metadataAge,
                 this.batchSplitRate,
+                this.batchSplitTotal,
                 
                 this.produceThrottleTimeAvg,
                 this.produceThrottleTimeMax,
 
                 // per-topic metrics
                 this.topicRecordSendRate,
+                this.topicRecordSendTotal,
                 this.topicByteRate,
+                this.topicByteTotal,
                 this.topicCompressionRate,
                 this.topicRecordRetryRate,
-                this.topicRecordErrorRate
+                this.topicRecordRetryTotal,
+                this.topicRecordErrorRate,
+                this.topicRecordErrorTotal
                 );
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
index d768f22..e3ea995 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common;
 
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.kafka.common.utils.Utils;
@@ -65,8 +66,28 @@ public class MetricNameTemplate {
         return this.description;
     }
 
-
     public Set<String> tags() {
         return tags;
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, group, tags);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        MetricNameTemplate other = (MetricNameTemplate) o;
+        return Objects.equals(name, other.name) && Objects.equals(group, other.group)
&&
+                Objects.equals(tags, other.tags);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("name=%s, group=%s, tags=%s", name, group, tags);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b2ede15..5263d3b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -1361,6 +1363,38 @@ public class FetcherTest {
         assertEquals(3, recordsCountAverage.value(), EPSILON);
     }
 
+    @Test
+    public void testFetcherMetricsTemplates() throws Exception {
+        metrics.close();
+        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
+        metrics = new Metrics(new MetricConfig().tags(clientTags));
+        metricsRegistry = new FetcherMetricsRegistry(clientTags.keySet(), "consumer" + groupId);
+        fetcher.close();
+        fetcher = createFetcher(subscriptions, metrics);
+
+        // Fetch from topic to generate topic metrics
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords
= fetcher.fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        // Create throttle metrics
+        Fetcher.throttleTimeSensor(metrics, metricsRegistry);
+
+        // Verify that all metrics except metrics-count have registered templates
+        Set<MetricNameTemplate> allMetrics = new HashSet<>();
+        for (MetricName n : metrics.metrics().keySet()) {
+            String name = n.name().replaceAll(tp0.toString(), "{topic}-{partition}");
+            if (!n.group().equals("kafka-metrics-count"))
+                allMetrics.add(new MetricNameTemplate(name, n.group(), "", n.tags().keySet()));
+        }
+        TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()),
"metrics", "templates");
+    }
+
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
             TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime)
{
         return fetchRecords(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
throttleTime);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 26e3e6c..a64bc56 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -70,10 +71,12 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -275,6 +278,33 @@ public class SenderTest {
     }
 
     @Test
+    public void testSenderMetricsTemplates() throws Exception {
+        metrics.close();
+        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
+        metrics = new Metrics(new MetricConfig().tags(clientTags));
+        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(clientTags.keySet());
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL,
+                1, metrics, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+
+        // Append a message so that topic metrics are created
+        accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
+        sender.run(time.milliseconds());
+        // Create throttle time metrics
+        Sender.throttleTimeSensor(metrics, metricsRegistry);
+
+        // Verify that all metrics except metrics-count have registered templates
+        Set<MetricNameTemplate> allMetrics = new HashSet<>();
+        for (MetricName n : metrics.metrics().keySet()) {
+            if (!n.group().equals("kafka-metrics-count"))
+                allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet()));
+        }
+        TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()),
"metrics", "templates");
+    }
+
+    @Test
     public void testRetries() throws Exception {
         // create a sender with retries = 1
         int maxRetries = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a339a387/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 47ca823..958ab2c 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -35,6 +35,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -317,6 +318,16 @@ public class TestUtils {
         assertEquals(Utils.toList(it1), Utils.toList(it2));
     }
 
+    public static <T> void checkEquals(Set<T> c1, Set<T> c2, String firstDesc,
String secondDesc) {
+        if (!c1.equals(c2)) {
+            Set<T> missing1 = new HashSet<>(c2);
+            missing1.removeAll(c1);
+            Set<T> missing2 = new HashSet<>(c1);
+            missing2.removeAll(c2);
+            fail(String.format("Sets not equal, missing %s=%s, missing %s=%s", firstDesc,
missing1, secondDesc, missing2));
+        }
+    }
+
     public static <T> List<T> toList(Iterable<? extends T> iterable) {
         List<T> list = new ArrayList<>();
         for (T item : iterable)
@@ -330,5 +341,4 @@ public class TestUtils {
         buffer.rewind();
         return buffer;
     }
-
 }


Mime
View raw message